Additional tests for liveliness QoS and minor refactoring
Added unit tests for (1) testing the scenario that a proxy writer writes data after its lease is expired, to check that the status for the pwr is set to alive again and (2) stress-testing the creation and deletetion of writers with decreasing lease duration. In addition I've optimized the locking in unref_proxy_participant a bit and fixed the liveliness changed callback when a writer with expired lease (not-alive) gets alive again. Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>
This commit is contained in:
		
							parent
							
								
									3822f42eff
								
							
						
					
					
						commit
						32c5a59c8f
					
				
					 4 changed files with 191 additions and 29 deletions
				
			
		| 
						 | 
				
			
			@ -2394,7 +2394,7 @@ static bool dds_rhc_default_add_readcondition (struct dds_rhc_default *rhc, dds_
 | 
			
		|||
    ddsrt_atomic_st32 (&cond->m_entity.m_status.m_trigger, trigger);
 | 
			
		||||
    dds_entity_status_signal (&cond->m_entity, DDS_DATA_AVAILABLE_STATUS);
 | 
			
		||||
  }
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
  TRACE ("add_readcondition(%p, %"PRIx32", %"PRIx32", %"PRIx32") => %p qminv %"PRIx32" ; rhc %"PRIu32" conds\n",
 | 
			
		||||
    (void *) rhc, cond->m_sample_states, cond->m_view_states,
 | 
			
		||||
    cond->m_instance_states, (void *) cond, cond->m_qminv, rhc->nconds);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -31,8 +31,7 @@
 | 
			
		|||
#define DDS_DOMAINID_PUB 0
 | 
			
		||||
#define DDS_DOMAINID_SUB 1
 | 
			
		||||
#define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}<General><NetworkInterfaceAddress>127.0.0.1</NetworkInterfaceAddress></General><Discovery><Ports><DomainGain>0</DomainGain></Ports></Discovery>"
 | 
			
		||||
#define DDS_CONFIG_NO_PORT_GAIN_LOG_PUB "<"DDS_PROJECT_NAME"><Domain><Discovery><Ports><DomainGain>0</DomainGain></Ports></Discovery><Tracing><OutputFile>cyclonedds_liveliness_pub.log</OutputFile><Verbosity>finest</Verbosity></Tracing></Domain></"DDS_PROJECT_NAME">"
 | 
			
		||||
#define DDS_CONFIG_NO_PORT_GAIN_LOG_SUB "<"DDS_PROJECT_NAME"><Domain><Discovery><Ports><DomainGain>0</DomainGain></Ports></Discovery><Tracing><OutputFile>cyclonedds_liveliness_sub.log</OutputFile><Verbosity>finest</Verbosity></Tracing></Domain></"DDS_PROJECT_NAME">"
 | 
			
		||||
#define DDS_CONFIG_NO_PORT_GAIN_LOG "<"DDS_PROJECT_NAME"><Domain><Discovery><Ports><DomainGain>0</DomainGain></Ports></Discovery><Tracing><OutputFile>cyclonedds_liveliness.log</OutputFile><Verbosity>finest</Verbosity></Tracing></Domain></"DDS_PROJECT_NAME">"
 | 
			
		||||
 | 
			
		||||
uint32_t g_topic_nr = 0;
 | 
			
		||||
static dds_entity_t g_pub_domain = 0;
 | 
			
		||||
| 
						 | 
				
			
			@ -130,7 +129,7 @@ CU_TheoryDataPoints(ddsc_liveliness, pmd_count) = {
 | 
			
		|||
};
 | 
			
		||||
#undef A
 | 
			
		||||
#undef MP
 | 
			
		||||
CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_liveliness, pmd_count, .init = liveliness_init, .fini = liveliness_fini, .timeout = 10)
 | 
			
		||||
CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_liveliness, pmd_count, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30)
 | 
			
		||||
{
 | 
			
		||||
	dds_entity_t pub_topic;
 | 
			
		||||
	dds_entity_t sub_topic;
 | 
			
		||||
| 
						 | 
				
			
			@ -186,7 +185,8 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin
 | 
			
		|||
				 (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000,
 | 
			
		||||
				 start_seqno, end_seqno);
 | 
			
		||||
 | 
			
		||||
	CU_ASSERT(end_seqno - start_seqno >= (kind == DDS_LIVELINESS_AUTOMATIC ? mult - 1 : 0))
 | 
			
		||||
	/* end-start should be mult - 1, but allow 1 pmd sample to be lost */
 | 
			
		||||
	CU_ASSERT(end_seqno - start_seqno >= (kind == DDS_LIVELINESS_AUTOMATIC ? mult - 2 : 0))
 | 
			
		||||
	if (kind == DDS_LIVELINESS_MANUAL_BY_PARTICIPANT)
 | 
			
		||||
		CU_ASSERT(get_pmd_seqno(g_pub_participant) - start_seqno < mult)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -198,16 +198,12 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
/* FIXME: add DDS_LIVELINESS_MANUAL_BY_TOPIC */
 | 
			
		||||
#define A DDS_LIVELINESS_AUTOMATIC
 | 
			
		||||
#define MP DDS_LIVELINESS_MANUAL_BY_PARTICIPANT
 | 
			
		||||
CU_TheoryDataPoints(ddsc_liveliness, expire_liveliness_kinds) = {
 | 
			
		||||
		CU_DataPoints(uint32_t,  200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200), /* lease duration */
 | 
			
		||||
		CU_DataPoints(double,    0.3, 0.3, 0.3, 0.3,   2,   2,   2,   2,   2,   2,   2,   2,   2), /* delay (n times lease duration) */
 | 
			
		||||
		CU_DataPoints(size_t,      1,   0,   2,   0,   1,   0,   1,   2,   0,   5,   0,  15,  15), /* number of writers with automatic liveliness */
 | 
			
		||||
		CU_DataPoints(size_t,      1,   1,   2,   2,   1,   1,   0,   2,   2,   5,  10,   0,  15), /* number of writers with manual-by-participant liveliness */
 | 
			
		||||
};
 | 
			
		||||
#undef A
 | 
			
		||||
#undef MP
 | 
			
		||||
CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp), ddsc_liveliness, expire_liveliness_kinds, .init = liveliness_init, .fini = liveliness_fini, .timeout = 60)
 | 
			
		||||
{
 | 
			
		||||
	dds_entity_t pub_topic;
 | 
			
		||||
| 
						 | 
				
			
			@ -467,4 +463,152 @@ CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = li
 | 
			
		|||
	CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
 | 
			
		||||
}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#define MAX_WRITERS 100
 | 
			
		||||
CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .fini = liveliness_fini)
 | 
			
		||||
{
 | 
			
		||||
	dds_entity_t pub_topic;
 | 
			
		||||
	dds_entity_t sub_topic;
 | 
			
		||||
	dds_entity_t reader;
 | 
			
		||||
	dds_entity_t writers[MAX_WRITERS];
 | 
			
		||||
	dds_entity_t waitset;
 | 
			
		||||
	dds_qos_t *wqos;
 | 
			
		||||
	struct dds_liveliness_changed_status lstatus;
 | 
			
		||||
	size_t wr_cnt = 0;
 | 
			
		||||
	char name[100];
 | 
			
		||||
	dds_qos_t *rqos;
 | 
			
		||||
	dds_attach_t triggered;
 | 
			
		||||
	uint32_t n, status;
 | 
			
		||||
	Space_Type1 sample = { 0, 0, 0 };
 | 
			
		||||
 | 
			
		||||
	/* topics */
 | 
			
		||||
	create_topic_name("ddsc_liveliness_wr_stress", 1, name, sizeof name);
 | 
			
		||||
	CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
 | 
			
		||||
	CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
 | 
			
		||||
 | 
			
		||||
	/* reader and waitset */
 | 
			
		||||
	CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
 | 
			
		||||
	dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
 | 
			
		||||
	CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
 | 
			
		||||
	dds_delete_qos(rqos);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
 | 
			
		||||
	CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
 | 
			
		||||
 | 
			
		||||
	/* create 1st writer and wait for it to become alive */
 | 
			
		||||
	CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL);
 | 
			
		||||
	dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (500));
 | 
			
		||||
	CU_ASSERT_FATAL((writers[0] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
 | 
			
		||||
 | 
			
		||||
	/* create writers */
 | 
			
		||||
	for (n = 1; n < MAX_WRITERS; n++)
 | 
			
		||||
	{
 | 
			
		||||
		dds_qset_liveliness(wqos, n % 1 ? DDS_LIVELINESS_AUTOMATIC : DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (500 - n));
 | 
			
		||||
		CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
 | 
			
		||||
		dds_write (writers[n], &sample);
 | 
			
		||||
		if (n % 3 == 2)
 | 
			
		||||
			dds_delete(writers[n]);
 | 
			
		||||
	}
 | 
			
		||||
	dds_delete_qos(wqos);
 | 
			
		||||
 | 
			
		||||
	/* wait for all writers to become alive */
 | 
			
		||||
	while (wr_cnt < MAX_WRITERS)
 | 
			
		||||
	{
 | 
			
		||||
		CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status (reader, &lstatus), DDS_RETCODE_OK);
 | 
			
		||||
		wr_cnt += (uint32_t)lstatus.alive_count_change;
 | 
			
		||||
		dds_sleepfor (DDS_MSECS(50));
 | 
			
		||||
	}
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL (wr_cnt, MAX_WRITERS);
 | 
			
		||||
 | 
			
		||||
	/* cleanup remaining writers */
 | 
			
		||||
	for (n = 0; n < wr_cnt; n++)
 | 
			
		||||
	{
 | 
			
		||||
		if (n % 3 != 2)
 | 
			
		||||
			CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK);
 | 
			
		||||
	}
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
 | 
			
		||||
}
 | 
			
		||||
#undef MAX_WRITERS
 | 
			
		||||
 | 
			
		||||
CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = liveliness_fini)
 | 
			
		||||
{
 | 
			
		||||
	dds_entity_t pub_topic;
 | 
			
		||||
	dds_entity_t sub_topic;
 | 
			
		||||
	dds_entity_t reader;
 | 
			
		||||
	dds_entity_t writer;
 | 
			
		||||
	dds_entity_t waitset;
 | 
			
		||||
	dds_qos_t *rqos;
 | 
			
		||||
	dds_qos_t *wqos;
 | 
			
		||||
	dds_attach_t triggered;
 | 
			
		||||
	uint32_t status = 0;
 | 
			
		||||
	struct dds_liveliness_changed_status lstatus;
 | 
			
		||||
	struct dds_subscription_matched_status sstatus;
 | 
			
		||||
	char name[100];
 | 
			
		||||
	dds_duration_t ldur = DDS_MSECS (500);
 | 
			
		||||
	Space_Type1 sample = { 1, 0, 0 };
 | 
			
		||||
 | 
			
		||||
	/* topics */
 | 
			
		||||
	create_topic_name("ddsc_liveliness_status_counts", g_topic_nr++, name, sizeof name);
 | 
			
		||||
	CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
 | 
			
		||||
	CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
 | 
			
		||||
 | 
			
		||||
	/* reader */
 | 
			
		||||
	CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
 | 
			
		||||
	dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
 | 
			
		||||
	CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
 | 
			
		||||
	dds_delete_qos(rqos);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
 | 
			
		||||
	CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
 | 
			
		||||
 | 
			
		||||
	/* writer */
 | 
			
		||||
	CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL);
 | 
			
		||||
	dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, ldur);
 | 
			
		||||
	CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
 | 
			
		||||
	dds_delete_qos(wqos);
 | 
			
		||||
 | 
			
		||||
	/* wait for writer to be alive */
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
 | 
			
		||||
 | 
			
		||||
	/* check status counts before proxy writer is expired */
 | 
			
		||||
	dds_get_liveliness_changed_status(reader, &lstatus);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1);
 | 
			
		||||
	dds_get_subscription_matched_status(reader, &sstatus);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1);
 | 
			
		||||
 | 
			
		||||
	/* sleep for more than lease duration, writer should be set not-alive but subscription still matched */
 | 
			
		||||
	dds_sleepfor(ldur + DDS_MSECS(100));
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
 | 
			
		||||
 | 
			
		||||
	dds_get_liveliness_changed_status(reader, &lstatus);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 0);
 | 
			
		||||
	dds_get_subscription_matched_status(reader, &sstatus);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1);
 | 
			
		||||
 | 
			
		||||
	/* write sample and re-check status counts */
 | 
			
		||||
	dds_write (writer, &sample);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
 | 
			
		||||
 | 
			
		||||
	dds_get_liveliness_changed_status(reader, &lstatus);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1);
 | 
			
		||||
	dds_get_subscription_matched_status(reader, &sstatus);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1);
 | 
			
		||||
 | 
			
		||||
	/* cleanup */
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
 | 
			
		||||
	CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue