allow dds_create_topic multiple times for the same topic
Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
		
							parent
							
								
									8e1bdb8855
								
							
						
					
					
						commit
						e912384500
					
				
					 4 changed files with 154 additions and 88 deletions
				
			
		| 
						 | 
				
			
			@ -195,6 +195,21 @@ void dds_qos_merge
 | 
			
		|||
    _In_ const dds_qos_t * __restrict src
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @brief Copy all QoS-policies from one structure to another, unless already set
 | 
			
		||||
 *
 | 
			
		||||
 * Policies are copied from src to dst, unless src already has the policy set to a non-default value.
 | 
			
		||||
 *
 | 
			
		||||
 * @param[in,out] dst - Pointer to the destination qos structure
 | 
			
		||||
 * @param[in] src - Pointer to the source qos structure
 | 
			
		||||
 */
 | 
			
		||||
DDS_EXPORT
 | 
			
		||||
bool dds_qos_equal
 | 
			
		||||
(
 | 
			
		||||
    _In_ const dds_qos_t * __restrict a,
 | 
			
		||||
    _In_ const dds_qos_t * __restrict b
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @brief Set the userdata of a qos structure.
 | 
			
		||||
 *
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -244,16 +244,30 @@ void dds_qos_merge (
 | 
			
		|||
{
 | 
			
		||||
    if(!src){
 | 
			
		||||
        DDS_ERROR(DDS_RETCODE_BAD_PARAMETER, "Argument source(src) is NULL");
 | 
			
		||||
        return ;
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
    if(!dst){
 | 
			
		||||
        DDS_ERROR(DDS_RETCODE_BAD_PARAMETER, "Argument destination(dst) is NULL");
 | 
			
		||||
        return ;
 | 
			
		||||
        return;
 | 
			
		||||
    }
 | 
			
		||||
    /* Copy qos from source to destination unless already set */
 | 
			
		||||
    nn_xqos_mergein_missing (dst, src);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool dds_qos_equal (
 | 
			
		||||
    _In_opt_ const dds_qos_t * __restrict a,
 | 
			
		||||
    _In_opt_ const dds_qos_t * __restrict b)
 | 
			
		||||
{
 | 
			
		||||
    /* FIXME: a bit of a hack - and I am not so sure I like accepting null pointers here anyway */
 | 
			
		||||
    if (a == NULL && b == NULL) {
 | 
			
		||||
        return true;
 | 
			
		||||
    } else if (a == NULL || b == NULL) {
 | 
			
		||||
        return false;
 | 
			
		||||
    } else {
 | 
			
		||||
        return nn_xqos_delta(a, b, ~(uint64_t)0) != 0;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void dds_qset_userdata(
 | 
			
		||||
    _Inout_ dds_qos_t * __restrict qos,
 | 
			
		||||
    _In_reads_bytes_opt_(sz) const void * __restrict value,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -153,7 +153,7 @@ dds_topic_status_cb(
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
sertopic_t
 | 
			
		||||
dds_topic_lookup(
 | 
			
		||||
dds_topic_lookup_locked(
 | 
			
		||||
        dds_domain *domain,
 | 
			
		||||
        const char *name)
 | 
			
		||||
{
 | 
			
		||||
| 
						 | 
				
			
			@ -163,7 +163,6 @@ dds_topic_lookup(
 | 
			
		|||
    assert (domain);
 | 
			
		||||
    assert (name);
 | 
			
		||||
 | 
			
		||||
    os_mutexLock (&dds_global.m_mutex);
 | 
			
		||||
    st = ut_avlIterFirst (&dds_topictree_def, &domain->m_topics, &iter);
 | 
			
		||||
    while (st) {
 | 
			
		||||
        if (strcmp (st->name, name) == 0) {
 | 
			
		||||
| 
						 | 
				
			
			@ -171,6 +170,17 @@ dds_topic_lookup(
 | 
			
		|||
        }
 | 
			
		||||
        st = ut_avlIterNext (&iter);
 | 
			
		||||
    }
 | 
			
		||||
    return st;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
sertopic_t
 | 
			
		||||
dds_topic_lookup(
 | 
			
		||||
        dds_domain *domain,
 | 
			
		||||
        const char *name)
 | 
			
		||||
{
 | 
			
		||||
    sertopic_t st;
 | 
			
		||||
    os_mutexLock (&dds_global.m_mutex);
 | 
			
		||||
    st = dds_topic_lookup_locked(domain, name);
 | 
			
		||||
    os_mutexUnlock (&dds_global.m_mutex);
 | 
			
		||||
    return st;
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -195,16 +205,14 @@ dds_topic_free(
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
static void
 | 
			
		||||
dds_topic_add(
 | 
			
		||||
dds_topic_add_locked(
 | 
			
		||||
        dds_domainid_t id,
 | 
			
		||||
        sertopic_t st)
 | 
			
		||||
{
 | 
			
		||||
    dds_domain * dom;
 | 
			
		||||
    os_mutexLock (&dds_global.m_mutex);
 | 
			
		||||
    dom = dds_domain_find_locked (id);
 | 
			
		||||
    assert (dom);
 | 
			
		||||
    ut_avlInsert (&dds_topictree_def, &dom->m_topics, st);
 | 
			
		||||
    os_mutexUnlock (&dds_global.m_mutex);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
_Pre_satisfies_((participant & DDS_ENTITY_KIND_MASK) == DDS_KIND_PARTICIPANT)
 | 
			
		||||
| 
						 | 
				
			
			@ -223,13 +231,15 @@ dds_find_topic(
 | 
			
		|||
    if (name) {
 | 
			
		||||
        rc = dds_entity_lock(participant, DDS_KIND_PARTICIPANT, &p);
 | 
			
		||||
        if (rc == DDS_RETCODE_OK) {
 | 
			
		||||
            st = dds_topic_lookup (p->m_domain, name);
 | 
			
		||||
            os_mutexLock (&dds_global.m_mutex);
 | 
			
		||||
            st = dds_topic_lookup_locked (p->m_domain, name);
 | 
			
		||||
            if (st) {
 | 
			
		||||
                dds_entity_add_ref (&st->status_cb_entity->m_entity);
 | 
			
		||||
                tp = st->status_cb_entity->m_entity.m_hdl;
 | 
			
		||||
            } else {
 | 
			
		||||
                tp = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET, "Topic is not being created yet");
 | 
			
		||||
            }
 | 
			
		||||
            os_mutexUnlock (&dds_global.m_mutex);
 | 
			
		||||
            dds_entity_unlock(p);
 | 
			
		||||
        } else {
 | 
			
		||||
            tp = DDS_ERRNO(rc, "Error occurred on locking entity");
 | 
			
		||||
| 
						 | 
				
			
			@ -297,6 +307,17 @@ dds_topic_qos_set(
 | 
			
		|||
    return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static bool dupdef_qos_ok(const dds_qos_t *qos, const struct sertopic *st)
 | 
			
		||||
{
 | 
			
		||||
    if ((qos == NULL) != (st->status_cb_entity->m_entity.m_qos == NULL)) {
 | 
			
		||||
        return false;
 | 
			
		||||
    } else if (qos == NULL) {
 | 
			
		||||
        return true;
 | 
			
		||||
    } else {
 | 
			
		||||
        return dds_qos_equal(st->status_cb_entity->m_entity.m_qos, qos);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
_Pre_satisfies_((participant & DDS_ENTITY_KIND_MASK) == DDS_KIND_PARTICIPANT)
 | 
			
		||||
DDS_EXPORT dds_entity_t
 | 
			
		||||
dds_create_topic(
 | 
			
		||||
| 
						 | 
				
			
			@ -353,97 +374,110 @@ dds_create_topic(
 | 
			
		|||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /* FIXME: I find it weird that qos may be NULL in the entity */
 | 
			
		||||
 | 
			
		||||
    /* Check if topic already exists with same name */
 | 
			
		||||
    if (dds_topic_lookup (par->m_domain, name)) {
 | 
			
		||||
        hdl = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET, "Precondition not met");
 | 
			
		||||
        goto qos_err;
 | 
			
		||||
    }
 | 
			
		||||
    os_mutexLock (&dds_global.m_mutex);
 | 
			
		||||
    if ((st = dds_topic_lookup_locked (par->m_domain, name)) != NULL) {
 | 
			
		||||
        if (st->type != desc) {
 | 
			
		||||
            /* FIXME: should copy the type, perhaps? but then the pointers will no longer be the same */
 | 
			
		||||
            hdl = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET, "Create topic with mismatching type.");
 | 
			
		||||
        } else if (!dupdef_qos_ok(qos, st)) {
 | 
			
		||||
            /* FIXME: should copy the type, perhaps? but then the pointers will no longer be the same */
 | 
			
		||||
            hdl = DDS_ERRNO(DDS_RETCODE_INCONSISTENT_POLICY, "Create topic with mismatching qos.");
 | 
			
		||||
        } else {
 | 
			
		||||
            dds_entity_add_ref (&st->status_cb_entity->m_entity);
 | 
			
		||||
            hdl = st->status_cb_entity->m_entity.m_hdl;
 | 
			
		||||
        }
 | 
			
		||||
        os_mutexUnlock (&dds_global.m_mutex);
 | 
			
		||||
    } else {
 | 
			
		||||
        typename = desc->m_typename;
 | 
			
		||||
        key = (char*) dds_alloc (strlen (name) + strlen (typename) + 2);
 | 
			
		||||
        strcpy (key, name);
 | 
			
		||||
        strcat (key, "/");
 | 
			
		||||
        strcat (key, typename);
 | 
			
		||||
 | 
			
		||||
    typename = desc->m_typename;
 | 
			
		||||
    key = (char*) dds_alloc (strlen (name) + strlen (typename) + 2);
 | 
			
		||||
    strcpy (key, name);
 | 
			
		||||
    strcat (key, "/");
 | 
			
		||||
    strcat (key, typename);
 | 
			
		||||
        if (qos) {
 | 
			
		||||
            new_qos = dds_qos_create();
 | 
			
		||||
            /* Only returns failure when one of the qos args is NULL, which
 | 
			
		||||
             * is not the case here. */
 | 
			
		||||
            (void)dds_qos_copy(new_qos, qos);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
    if (qos) {
 | 
			
		||||
        new_qos = dds_qos_create();
 | 
			
		||||
        /* Only returns failure when one of the qos args is NULL, which
 | 
			
		||||
         * is not the case here. */
 | 
			
		||||
        (void)dds_qos_copy(new_qos, qos);
 | 
			
		||||
    }
 | 
			
		||||
        /* Create topic */
 | 
			
		||||
        top = dds_alloc (sizeof (*top));
 | 
			
		||||
        top->m_descriptor = desc;
 | 
			
		||||
        hdl = dds_entity_init (&top->m_entity, par, DDS_KIND_TOPIC, new_qos, listener, DDS_TOPIC_STATUS_MASK);
 | 
			
		||||
        top->m_entity.m_deriver.delete = dds_topic_delete;
 | 
			
		||||
        top->m_entity.m_deriver.set_qos = dds_topic_qos_set;
 | 
			
		||||
        top->m_entity.m_deriver.validate_status = dds_topic_status_validate;
 | 
			
		||||
 | 
			
		||||
    /* Create topic */
 | 
			
		||||
    top = dds_alloc (sizeof (*top));
 | 
			
		||||
    top->m_descriptor = desc;
 | 
			
		||||
    hdl = dds_entity_init (&top->m_entity, par, DDS_KIND_TOPIC, new_qos, listener, DDS_TOPIC_STATUS_MASK);
 | 
			
		||||
    top->m_entity.m_deriver.delete = dds_topic_delete;
 | 
			
		||||
    top->m_entity.m_deriver.set_qos = dds_topic_qos_set;
 | 
			
		||||
    top->m_entity.m_deriver.validate_status = dds_topic_status_validate;
 | 
			
		||||
 | 
			
		||||
    st = dds_alloc (sizeof (*st));
 | 
			
		||||
    st->type = (void*) desc;
 | 
			
		||||
    os_atomic_st32 (&st->refcount, 1);
 | 
			
		||||
    st->status_cb = dds_topic_status_cb;
 | 
			
		||||
    st->status_cb_entity = top;
 | 
			
		||||
    st->name_typename = key;
 | 
			
		||||
    st->name = dds_alloc (strlen (name) + 1);
 | 
			
		||||
    strcpy (st->name, name);
 | 
			
		||||
    st->typename = dds_alloc (strlen (typename) + 1);
 | 
			
		||||
    strcpy (st->typename, typename);
 | 
			
		||||
    st->nkeys = desc->m_nkeys;
 | 
			
		||||
    st->keys = desc->m_keys;
 | 
			
		||||
    st->id = next_topicid++;
 | 
			
		||||
        st = dds_alloc (sizeof (*st));
 | 
			
		||||
        st->type = (void*) desc;
 | 
			
		||||
        os_atomic_st32 (&st->refcount, 1);
 | 
			
		||||
        st->status_cb = dds_topic_status_cb;
 | 
			
		||||
        st->status_cb_entity = top;
 | 
			
		||||
        st->name_typename = key;
 | 
			
		||||
        st->name = dds_alloc (strlen (name) + 1);
 | 
			
		||||
        strcpy (st->name, name);
 | 
			
		||||
        st->typename = dds_alloc (strlen (typename) + 1);
 | 
			
		||||
        strcpy (st->typename, typename);
 | 
			
		||||
        st->nkeys = desc->m_nkeys;
 | 
			
		||||
        st->keys = desc->m_keys;
 | 
			
		||||
        st->id = next_topicid++;
 | 
			
		||||
 | 
			
		||||
#ifdef VXWORKS_RTP
 | 
			
		||||
    st->hash = (st->id * UINT64_C (12844332200329132887UL)) >> 32;
 | 
			
		||||
        st->hash = (st->id * UINT64_C (12844332200329132887UL)) >> 32;
 | 
			
		||||
#else
 | 
			
		||||
    st->hash = (st->id * UINT64_C (12844332200329132887)) >> 32;
 | 
			
		||||
        st->hash = (st->id * UINT64_C (12844332200329132887)) >> 32;
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
    /* Check if topic cannot be optimised (memcpy marshal) */
 | 
			
		||||
        /* Check if topic cannot be optimised (memcpy marshal) */
 | 
			
		||||
 | 
			
		||||
    if ((desc->m_flagset & DDS_TOPIC_NO_OPTIMIZE) == 0) {
 | 
			
		||||
        st->opt_size = dds_stream_check_optimize (desc);
 | 
			
		||||
    }
 | 
			
		||||
    top->m_stopic = st;
 | 
			
		||||
 | 
			
		||||
    /* Add topic to extent */
 | 
			
		||||
    dds_topic_add (par->m_domainid, st);
 | 
			
		||||
 | 
			
		||||
    nn_plist_init_empty (&plist);
 | 
			
		||||
    if (new_qos) {
 | 
			
		||||
        dds_qos_merge (&plist.qos, new_qos);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /* Set Topic meta data (for SEDP publication) */
 | 
			
		||||
    plist.qos.topic_name = dds_string_dup (st->name);
 | 
			
		||||
    plist.qos.type_name = dds_string_dup (st->typename);
 | 
			
		||||
    plist.qos.present |= (QP_TOPIC_NAME | QP_TYPE_NAME);
 | 
			
		||||
    if (desc->m_meta) {
 | 
			
		||||
        plist.type_description = dds_string_dup (desc->m_meta);
 | 
			
		||||
        plist.present |= PP_PRISMTECH_TYPE_DESCRIPTION;
 | 
			
		||||
    }
 | 
			
		||||
    if (desc->m_nkeys) {
 | 
			
		||||
        plist.qos.present |= QP_PRISMTECH_SUBSCRIPTION_KEYS;
 | 
			
		||||
        plist.qos.subscription_keys.use_key_list = 1;
 | 
			
		||||
        plist.qos.subscription_keys.key_list.n = desc->m_nkeys;
 | 
			
		||||
        plist.qos.subscription_keys.key_list.strs = dds_alloc (desc->m_nkeys * sizeof (char*));
 | 
			
		||||
        for (index = 0; index < desc->m_nkeys; index++) {
 | 
			
		||||
            plist.qos.subscription_keys.key_list.strs[index] = dds_string_dup (desc->m_keys[index].m_name);
 | 
			
		||||
        if ((desc->m_flagset & DDS_TOPIC_NO_OPTIMIZE) == 0) {
 | 
			
		||||
            st->opt_size = dds_stream_check_optimize (desc);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
        top->m_stopic = st;
 | 
			
		||||
 | 
			
		||||
    /* Publish Topic */
 | 
			
		||||
    if (asleep) {
 | 
			
		||||
        thread_state_awake (thr);
 | 
			
		||||
        /* Add topic to extent */
 | 
			
		||||
        dds_topic_add_locked (par->m_domainid, st);
 | 
			
		||||
        os_mutexUnlock (&dds_global.m_mutex);
 | 
			
		||||
 | 
			
		||||
        nn_plist_init_empty (&plist);
 | 
			
		||||
        if (new_qos) {
 | 
			
		||||
            dds_qos_merge (&plist.qos, new_qos);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        /* Set Topic meta data (for SEDP publication) */
 | 
			
		||||
        plist.qos.topic_name = dds_string_dup (st->name);
 | 
			
		||||
        plist.qos.type_name = dds_string_dup (st->typename);
 | 
			
		||||
        plist.qos.present |= (QP_TOPIC_NAME | QP_TYPE_NAME);
 | 
			
		||||
        if (desc->m_meta) {
 | 
			
		||||
            plist.type_description = dds_string_dup (desc->m_meta);
 | 
			
		||||
            plist.present |= PP_PRISMTECH_TYPE_DESCRIPTION;
 | 
			
		||||
        }
 | 
			
		||||
        if (desc->m_nkeys) {
 | 
			
		||||
            plist.qos.present |= QP_PRISMTECH_SUBSCRIPTION_KEYS;
 | 
			
		||||
            plist.qos.subscription_keys.use_key_list = 1;
 | 
			
		||||
            plist.qos.subscription_keys.key_list.n = desc->m_nkeys;
 | 
			
		||||
            plist.qos.subscription_keys.key_list.strs = dds_alloc (desc->m_nkeys * sizeof (char*));
 | 
			
		||||
            for (index = 0; index < desc->m_nkeys; index++) {
 | 
			
		||||
                plist.qos.subscription_keys.key_list.strs[index] = dds_string_dup (desc->m_keys[index].m_name);
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        /* Publish Topic */
 | 
			
		||||
        if (asleep) {
 | 
			
		||||
            thread_state_awake (thr);
 | 
			
		||||
        }
 | 
			
		||||
        ddsi_pp = ephash_lookup_participant_guid (&par->m_guid);
 | 
			
		||||
        assert (ddsi_pp);
 | 
			
		||||
        sedp_write_topic (ddsi_pp, &plist);
 | 
			
		||||
        if (asleep) {
 | 
			
		||||
            thread_state_asleep (thr);
 | 
			
		||||
        }
 | 
			
		||||
        nn_plist_fini (&plist);
 | 
			
		||||
    }
 | 
			
		||||
    ddsi_pp = ephash_lookup_participant_guid (&par->m_guid);
 | 
			
		||||
    assert (ddsi_pp);
 | 
			
		||||
    sedp_write_topic (ddsi_pp, &plist);
 | 
			
		||||
    if (asleep) {
 | 
			
		||||
        thread_state_asleep (thr);
 | 
			
		||||
    }
 | 
			
		||||
    nn_plist_fini (&plist);
 | 
			
		||||
 | 
			
		||||
qos_err:
 | 
			
		||||
    dds_entity_unlock(par);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -125,9 +125,12 @@ Test(ddsc_topic_create, non_participants, .init=ddsc_topic_init, .fini=ddsc_topi
 | 
			
		|||
Test(ddsc_topic_create, duplicate, .init=ddsc_topic_init, .fini=ddsc_topic_fini)
 | 
			
		||||
{
 | 
			
		||||
    dds_entity_t topic;
 | 
			
		||||
    /* Creating the same topic should fail.  */
 | 
			
		||||
    dds_return_t ret;
 | 
			
		||||
    /* Creating the same topic should succeed.  */
 | 
			
		||||
    topic = dds_create_topic(g_participant, &RoundTripModule_DataType_desc, g_topicRtmDataTypeName, NULL, NULL);
 | 
			
		||||
    cr_assert_eq(dds_err_nr(topic), DDS_RETCODE_PRECONDITION_NOT_MET, "returned %s", dds_err_str(topic));
 | 
			
		||||
    cr_assert_gt(topic, 0, "returned %s", dds_err_str(topic));
 | 
			
		||||
    ret = dds_delete(topic);
 | 
			
		||||
    cr_assert_eq(ret, DDS_RETCODE_OK);
 | 
			
		||||
}
 | 
			
		||||
/*************************************************************************************************/
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue