Add a test for duration 0ns and 1ns manual lease
Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
		
							parent
							
								
									5632ed46b5
								
							
						
					
					
						commit
						4af531a1c3
					
				
					 2 changed files with 270 additions and 1 deletions
				
			
		| 
						 | 
				
			
			@ -814,3 +814,272 @@ CU_Theory((uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp)
 | 
			
		|||
  } while (!test_finished);
 | 
			
		||||
}
 | 
			
		||||
#undef MAX_WRITERS
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Check that manual-by-participant/topic writers with lease duration 0ns and 1ns work.
 | 
			
		||||
 */
 | 
			
		||||
struct liveliness_changed_state {
 | 
			
		||||
  ddsrt_mutex_t lock;
 | 
			
		||||
  dds_instance_handle_t w0_handle;
 | 
			
		||||
  bool weirdness;
 | 
			
		||||
  uint32_t w0_alive, w0_not_alive;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
static void liveliness_changed_listener (dds_entity_t rd, const dds_liveliness_changed_status_t status, void *arg)
 | 
			
		||||
{
 | 
			
		||||
  struct liveliness_changed_state *st = arg;
 | 
			
		||||
  (void) rd;
 | 
			
		||||
 | 
			
		||||
  ddsrt_mutex_lock (&st->lock);
 | 
			
		||||
  if (status.last_publication_handle != st->w0_handle)
 | 
			
		||||
  {
 | 
			
		||||
    if (st->w0_handle == 0)
 | 
			
		||||
    {
 | 
			
		||||
      printf ("liveliness_changed_listener: w0 = %"PRIx64"\n", status.last_publication_handle);
 | 
			
		||||
      st->w0_handle = status.last_publication_handle;
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
    {
 | 
			
		||||
      printf ("liveliness_changed_listener: too many writer handles\n");
 | 
			
		||||
      st->weirdness = true;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  if (status.alive_count_change != 0 || status.not_alive_count_change != 0)
 | 
			
		||||
  {
 | 
			
		||||
    switch (status.alive_count_change)
 | 
			
		||||
    {
 | 
			
		||||
      case -1:
 | 
			
		||||
        break;
 | 
			
		||||
      case 1:
 | 
			
		||||
        if (status.last_publication_handle == st->w0_handle)
 | 
			
		||||
          st->w0_alive++;
 | 
			
		||||
        else
 | 
			
		||||
        {
 | 
			
		||||
          printf ("liveliness_changed_listener: alive_count_change = %d: unrecognized writer\n", status.alive_count_change);
 | 
			
		||||
          st->weirdness = true;
 | 
			
		||||
        }
 | 
			
		||||
        break;
 | 
			
		||||
      default:
 | 
			
		||||
        printf ("liveliness_changed_listener: alive_count_change = %d\n", status.alive_count_change);
 | 
			
		||||
        st->weirdness = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    switch (status.not_alive_count_change)
 | 
			
		||||
    {
 | 
			
		||||
      case -1:
 | 
			
		||||
        break;
 | 
			
		||||
      case 1:
 | 
			
		||||
        if (status.last_publication_handle == st->w0_handle)
 | 
			
		||||
          st->w0_not_alive++;
 | 
			
		||||
        else
 | 
			
		||||
        {
 | 
			
		||||
          printf ("liveliness_changed_listener: not_alive_count_change = %d: unrecognized writer\n", status.not_alive_count_change);
 | 
			
		||||
          st->weirdness = true;
 | 
			
		||||
        }
 | 
			
		||||
        break;
 | 
			
		||||
      default:
 | 
			
		||||
        printf ("liveliness_changed_listener: not_alive_count_change = %d\n", status.not_alive_count_change);
 | 
			
		||||
        st->weirdness = true;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  ddsrt_mutex_unlock (&st->lock);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static bool get_and_check_status (dds_entity_t reader, dds_entity_t writer_active)
 | 
			
		||||
{
 | 
			
		||||
  struct dds_liveliness_changed_status lstatus;
 | 
			
		||||
  struct dds_subscription_matched_status sstatus;
 | 
			
		||||
  struct dds_publication_matched_status pstatus;
 | 
			
		||||
  dds_return_t rc;
 | 
			
		||||
  rc = dds_get_subscription_matched_status(reader, &sstatus);
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
  rc = dds_get_liveliness_changed_status(reader, &lstatus);
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
  rc = dds_get_publication_matched_status(writer_active, &pstatus);
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
  CU_ASSERT_FATAL(lstatus.alive_count + lstatus.not_alive_count <= 2);
 | 
			
		||||
  printf ("%d %d %d %d\n", (int)sstatus.current_count, (int)lstatus.alive_count, (int)lstatus.not_alive_count, (int)pstatus.current_count);
 | 
			
		||||
  return (sstatus.current_count == 2 && lstatus.not_alive_count == 2 && pstatus.current_count == 1);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_liveliness_kind_t lkind, dds_duration_t ldur)
 | 
			
		||||
{
 | 
			
		||||
  const uint32_t nsamples = (sleep <= DDS_MSECS(10)) ? 100 : 10;
 | 
			
		||||
  dds_entity_t pub_topic;
 | 
			
		||||
  dds_entity_t sub_topic;
 | 
			
		||||
  dds_entity_t reader;
 | 
			
		||||
  dds_entity_t writer_active; /* writing */
 | 
			
		||||
  dds_entity_t writer_inactive; /* not writing, liveliness should still toggle */
 | 
			
		||||
  dds_entity_t waitset;
 | 
			
		||||
  dds_listener_t *listener;
 | 
			
		||||
  dds_qos_t *qos;
 | 
			
		||||
  dds_return_t rc;
 | 
			
		||||
  struct dds_liveliness_changed_status lstatus;
 | 
			
		||||
  char name[100];
 | 
			
		||||
  Space_Type1 sample = {1, 0, 0};
 | 
			
		||||
  struct liveliness_changed_state listener_state = {
 | 
			
		||||
    .weirdness = false,
 | 
			
		||||
    .w0_handle = 0,
 | 
			
		||||
    .w0_alive = 0,
 | 
			
		||||
    .w0_not_alive = 0,
 | 
			
		||||
  };
 | 
			
		||||
  ddsrt_mutex_init (&listener_state.lock);
 | 
			
		||||
 | 
			
		||||
  waitset = dds_create_waitset(DDS_CYCLONEDDS_HANDLE);
 | 
			
		||||
  CU_ASSERT_FATAL(waitset > 0);
 | 
			
		||||
 | 
			
		||||
  qos = dds_create_qos();
 | 
			
		||||
  CU_ASSERT_FATAL(qos != NULL);
 | 
			
		||||
  dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY);
 | 
			
		||||
  dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, 0);
 | 
			
		||||
 | 
			
		||||
  create_topic_name("ddsc_liveliness_lease_duration_zero", g_topic_nr++, name, sizeof name);
 | 
			
		||||
  pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, qos, NULL);
 | 
			
		||||
  CU_ASSERT_FATAL(pub_topic > 0);
 | 
			
		||||
  sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, qos, NULL);
 | 
			
		||||
  CU_ASSERT_FATAL(sub_topic > 0);
 | 
			
		||||
 | 
			
		||||
  /* reader liveliness is always automatic/infinity */
 | 
			
		||||
  dds_qset_liveliness(qos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
 | 
			
		||||
  reader = dds_create_reader(g_sub_participant, sub_topic, qos, NULL);
 | 
			
		||||
  CU_ASSERT_FATAL(reader > 0);
 | 
			
		||||
  rc = dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS);
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
  rc = dds_waitset_attach(waitset, reader, reader);
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
 | 
			
		||||
  /* writer liveliness varies */
 | 
			
		||||
  dds_qset_liveliness(qos, lkind, ldur);
 | 
			
		||||
  writer_active = dds_create_writer(g_pub_participant, pub_topic, qos, NULL);
 | 
			
		||||
  CU_ASSERT_FATAL(writer_active > 0);
 | 
			
		||||
  writer_inactive = dds_create_writer(g_pub_participant, pub_topic, qos, NULL);
 | 
			
		||||
  CU_ASSERT_FATAL(writer_inactive > 0);
 | 
			
		||||
  rc = dds_set_status_mask(writer_active, DDS_PUBLICATION_MATCHED_STATUS);
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
  rc = dds_waitset_attach(waitset, writer_active, writer_active);
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
 | 
			
		||||
  dds_delete_qos(qos);
 | 
			
		||||
 | 
			
		||||
  /* wait for writers to be discovered and to have lost their liveliness, and for
 | 
			
		||||
     writer_active to have discovered the reader */
 | 
			
		||||
  while (!get_and_check_status (reader, writer_active))
 | 
			
		||||
  {
 | 
			
		||||
    rc = dds_waitset_wait(waitset, NULL, 0, DDS_SECS(5));
 | 
			
		||||
    if (rc < 1)
 | 
			
		||||
    {
 | 
			
		||||
      get_and_check_status (reader, writer_active);
 | 
			
		||||
      CU_ASSERT_FATAL(rc >= 1);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /* switch to using a listener: those allow us to observe all events */
 | 
			
		||||
  listener = dds_create_listener (&listener_state);
 | 
			
		||||
  dds_lset_liveliness_changed(listener, liveliness_changed_listener);
 | 
			
		||||
  rc = dds_set_listener (reader, listener);
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
  dds_delete_listener (listener);
 | 
			
		||||
 | 
			
		||||
  /* write as fast as possible - we don't expect this to cause the writers
 | 
			
		||||
     to gain and lose liveliness once for each sample, but it should have
 | 
			
		||||
     become alive at least once and fall back to not alive afterward */
 | 
			
		||||
  for (uint32_t i = 0; i < nsamples; i++)
 | 
			
		||||
  {
 | 
			
		||||
    rc = dds_write(writer_active, &sample);
 | 
			
		||||
    CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
    if (sleep && i < nsamples - 1)
 | 
			
		||||
      dds_sleepfor(sleep);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  rc = dds_wait_for_acks(writer_active, DDS_SECS(5));
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
 | 
			
		||||
  /* verify the reader received all samples */
 | 
			
		||||
  {
 | 
			
		||||
    void *raw[] = { &sample };
 | 
			
		||||
    dds_sample_info_t si;
 | 
			
		||||
    uint32_t cnt = 0;
 | 
			
		||||
    while (dds_take(reader, raw, &si, 1, 1) == 1)
 | 
			
		||||
      cnt++;
 | 
			
		||||
    CU_ASSERT(cnt == nsamples);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /* transition to not alive is not necessarily immediate */
 | 
			
		||||
  {
 | 
			
		||||
    int retries = 100;
 | 
			
		||||
    rc = dds_get_liveliness_changed_status(reader, &lstatus);
 | 
			
		||||
    CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
    printf("early liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count);
 | 
			
		||||
 | 
			
		||||
    ddsrt_mutex_lock (&listener_state.lock);
 | 
			
		||||
    printf("early w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state.w0_handle, listener_state.w0_alive, listener_state.w0_not_alive);
 | 
			
		||||
    CU_ASSERT(!listener_state.weirdness);
 | 
			
		||||
    CU_ASSERT(listener_state.w0_handle != 0);
 | 
			
		||||
    while (listener_state.w0_not_alive < listener_state.w0_alive && retries-- > 0)
 | 
			
		||||
    {
 | 
			
		||||
      ddsrt_mutex_unlock(&listener_state.lock);
 | 
			
		||||
      dds_sleepfor(DDS_MSECS(10));
 | 
			
		||||
      rc = dds_get_liveliness_changed_status(reader, &lstatus);
 | 
			
		||||
      CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
      ddsrt_mutex_lock(&listener_state.lock);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    printf("late liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count);
 | 
			
		||||
    printf("final w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state.w0_handle, listener_state.w0_alive, listener_state.w0_not_alive);
 | 
			
		||||
    CU_ASSERT(listener_state.w0_alive == listener_state.w0_not_alive);
 | 
			
		||||
    if (sleep == 0)
 | 
			
		||||
    {
 | 
			
		||||
      /* if not sleeping, it's ok if the transition happens only once */
 | 
			
		||||
      CU_ASSERT(listener_state.w0_alive > 0);
 | 
			
		||||
    }
 | 
			
		||||
    else if (sleep <= DDS_MSECS(10))
 | 
			
		||||
    {
 | 
			
		||||
      /* if sleeping briefly, expect the a good number of writes to toggle liveliness */
 | 
			
		||||
      CU_ASSERT(listener_state.w0_alive >= nsamples / 3);
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
    {
 | 
			
		||||
      /* if sleeping, expect the vast majority (90%) of the writes to toggle liveliness */
 | 
			
		||||
      CU_ASSERT(listener_state.w0_alive >= nsamples - nsamples / 10);
 | 
			
		||||
    }
 | 
			
		||||
    ddsrt_mutex_unlock(&listener_state.lock);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /* cleanup */
 | 
			
		||||
  rc = dds_delete(waitset);
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
  rc = dds_delete(reader);
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
  rc = dds_delete(writer_active);
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
  rc = dds_delete(writer_inactive);
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
  rc = dds_delete(sub_topic);
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
  rc = dds_delete(pub_topic);
 | 
			
		||||
  CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
 | 
			
		||||
 | 
			
		||||
  ddsrt_mutex_destroy(&listener_state.lock);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
CU_Test(ddsc_liveliness, lease_duration_zero_or_one, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30)
 | 
			
		||||
{
 | 
			
		||||
  static const dds_duration_t sleep[] = { 0, DDS_MSECS(10), DDS_MSECS(100) };
 | 
			
		||||
  static const dds_liveliness_kind_t lkind[] = { DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_LIVELINESS_MANUAL_BY_TOPIC };
 | 
			
		||||
  static const dds_duration_t ldur[] = { 0, 1 };
 | 
			
		||||
  for (size_t sleep_idx = 0; sleep_idx < sizeof (sleep) / sizeof (sleep[0]); sleep_idx++)
 | 
			
		||||
  {
 | 
			
		||||
    for (size_t lkind_idx = 0; lkind_idx < sizeof (lkind) / sizeof (lkind[0]); lkind_idx++)
 | 
			
		||||
    {
 | 
			
		||||
      for (size_t ldur_idx = 0; ldur_idx < sizeof (ldur) / sizeof (ldur[0]); ldur_idx++)
 | 
			
		||||
      {
 | 
			
		||||
        dds_duration_t s = sleep[sleep_idx];
 | 
			
		||||
        dds_liveliness_kind_t k = lkind[lkind_idx];
 | 
			
		||||
        dds_duration_t d = ldur[ldur_idx];
 | 
			
		||||
        printf ("lease_duration_zero_or_one: sleep = %"PRId64" lkind = %d ldur = %"PRId64"\n", s, (int) k, d);
 | 
			
		||||
        lease_duration_zero_or_one_impl (s, k, d);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue