rework roundtrip to print 99 percentile & max and to optionally use listeners
Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
		
							parent
							
								
									5c0bdddc2a
								
							
						
					
					
						commit
						e062da6798
					
				
					 2 changed files with 225 additions and 181 deletions
				
			
		| 
						 | 
				
			
			@ -12,7 +12,7 @@
 | 
			
		|||
 | 
			
		||||
/* Forward declaration */
 | 
			
		||||
 | 
			
		||||
static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond);
 | 
			
		||||
static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond, dds_listener_t *listener);
 | 
			
		||||
static void finalize_dds(dds_entity_t participant, dds_entity_t reader, dds_entity_t readCond);
 | 
			
		||||
 | 
			
		||||
typedef struct ExampleTimeStats
 | 
			
		||||
| 
						 | 
				
			
			@ -101,6 +101,12 @@ static double exampleGetMedianFromTimeStats (ExampleTimeStats *stats)
 | 
			
		|||
  return median;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static dds_time_t exampleGet99PercentileFromTimeStats (ExampleTimeStats *stats)
 | 
			
		||||
{
 | 
			
		||||
  qsort (stats->values, stats->valuesSize, sizeof (dds_time_t), exampleCompareul);
 | 
			
		||||
  return stats->values[stats->valuesSize - stats->valuesSize / 100];
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static dds_entity_t waitSet;
 | 
			
		||||
 | 
			
		||||
#ifdef _WIN32
 | 
			
		||||
| 
						 | 
				
			
			@ -117,44 +123,124 @@ static void CtrlHandler (int fdwCtrlType)
 | 
			
		|||
}
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
static dds_entity_t writer;
 | 
			
		||||
static dds_entity_t reader;
 | 
			
		||||
static dds_entity_t participant;
 | 
			
		||||
static dds_entity_t readCond;
 | 
			
		||||
 | 
			
		||||
static ExampleTimeStats roundTrip;
 | 
			
		||||
static ExampleTimeStats writeAccess;
 | 
			
		||||
static ExampleTimeStats readAccess;
 | 
			
		||||
static ExampleTimeStats roundTripOverall;
 | 
			
		||||
static ExampleTimeStats writeAccessOverall;
 | 
			
		||||
static ExampleTimeStats readAccessOverall;
 | 
			
		||||
 | 
			
		||||
static RoundTripModule_DataType pub_data;
 | 
			
		||||
static RoundTripModule_DataType sub_data[MAX_SAMPLES];
 | 
			
		||||
static void *samples[MAX_SAMPLES];
 | 
			
		||||
static dds_sample_info_t info[MAX_SAMPLES];
 | 
			
		||||
 | 
			
		||||
static dds_time_t startTime;
 | 
			
		||||
static dds_time_t preWriteTime;
 | 
			
		||||
static dds_time_t postWriteTime;
 | 
			
		||||
static dds_time_t preTakeTime;
 | 
			
		||||
static dds_time_t postTakeTime;
 | 
			
		||||
static dds_time_t elapsed = 0;
 | 
			
		||||
 | 
			
		||||
static bool warmUp = true;
 | 
			
		||||
 | 
			
		||||
static void data_available(dds_entity_t reader, void *arg)
 | 
			
		||||
{
 | 
			
		||||
  dds_time_t difference = 0;
 | 
			
		||||
  int status;
 | 
			
		||||
  (void)arg;
 | 
			
		||||
  /* Take sample and check that it is valid */
 | 
			
		||||
  preTakeTime = dds_time ();
 | 
			
		||||
  status = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  postTakeTime = dds_time ();
 | 
			
		||||
 | 
			
		||||
  /* Update stats */
 | 
			
		||||
  difference = (postWriteTime - preWriteTime)/DDS_NSECS_IN_USEC;
 | 
			
		||||
  writeAccess = *exampleAddTimingToTimeStats (&writeAccess, difference);
 | 
			
		||||
  writeAccessOverall = *exampleAddTimingToTimeStats (&writeAccessOverall, difference);
 | 
			
		||||
 | 
			
		||||
  difference = (postTakeTime - preTakeTime)/DDS_NSECS_IN_USEC;
 | 
			
		||||
  readAccess = *exampleAddTimingToTimeStats (&readAccess, difference);
 | 
			
		||||
  readAccessOverall = *exampleAddTimingToTimeStats (&readAccessOverall, difference);
 | 
			
		||||
 | 
			
		||||
  difference = (postTakeTime - info[0].source_timestamp)/DDS_NSECS_IN_USEC;
 | 
			
		||||
  roundTrip = *exampleAddTimingToTimeStats (&roundTrip, difference);
 | 
			
		||||
  roundTripOverall = *exampleAddTimingToTimeStats (&roundTripOverall, difference);
 | 
			
		||||
 | 
			
		||||
  if (!warmUp) {
 | 
			
		||||
    /* Print stats each second */
 | 
			
		||||
    difference = (postTakeTime - startTime)/DDS_NSECS_IN_USEC;
 | 
			
		||||
    if (difference > US_IN_ONE_SEC)
 | 
			
		||||
    {
 | 
			
		||||
      printf("%9" PRIi64 " %9lu %8.0f %8" PRIi64 " %8" PRIi64 " %8" PRIi64 " %10lu %8.0f %8" PRIi64 " %10lu %8.0f %8" PRIi64 "\n",
 | 
			
		||||
             elapsed + 1,
 | 
			
		||||
             roundTrip.count,
 | 
			
		||||
             exampleGetMedianFromTimeStats (&roundTrip),
 | 
			
		||||
             roundTrip.min,
 | 
			
		||||
             exampleGet99PercentileFromTimeStats (&roundTrip),
 | 
			
		||||
             roundTrip.max,
 | 
			
		||||
             writeAccess.count,
 | 
			
		||||
             exampleGetMedianFromTimeStats (&writeAccess),
 | 
			
		||||
             writeAccess.min,
 | 
			
		||||
             readAccess.count,
 | 
			
		||||
             exampleGetMedianFromTimeStats (&readAccess),
 | 
			
		||||
             readAccess.min);
 | 
			
		||||
 | 
			
		||||
      exampleResetTimeStats (&roundTrip);
 | 
			
		||||
      exampleResetTimeStats (&writeAccess);
 | 
			
		||||
      exampleResetTimeStats (&readAccess);
 | 
			
		||||
      startTime = dds_time ();
 | 
			
		||||
      elapsed++;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  preWriteTime = dds_time();
 | 
			
		||||
  status = dds_write_ts (writer, &pub_data, preWriteTime);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  postWriteTime = dds_time();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void usage(void)
 | 
			
		||||
{
 | 
			
		||||
  printf ("Usage (parameters must be supplied in order):\n"
 | 
			
		||||
          "./ping [-l] [payloadSize (bytes, 0 - 100M)] [numSamples (0 = infinite)] [timeOut (seconds, 0 = infinite)]\n"
 | 
			
		||||
          "./ping quit - ping sends a quit signal to pong.\n"
 | 
			
		||||
          "Defaults:\n"
 | 
			
		||||
          "./ping 0 0 0\n");
 | 
			
		||||
  exit(EXIT_FAILURE);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int main (int argc, char *argv[])
 | 
			
		||||
{
 | 
			
		||||
  dds_entity_t writer;
 | 
			
		||||
  dds_entity_t reader;
 | 
			
		||||
  dds_entity_t participant;
 | 
			
		||||
  dds_entity_t readCond;
 | 
			
		||||
 | 
			
		||||
  ExampleTimeStats roundTrip;
 | 
			
		||||
  ExampleTimeStats writeAccess;
 | 
			
		||||
  ExampleTimeStats readAccess;
 | 
			
		||||
  ExampleTimeStats roundTripOverall;
 | 
			
		||||
  ExampleTimeStats writeAccessOverall;
 | 
			
		||||
  ExampleTimeStats readAccessOverall;
 | 
			
		||||
 | 
			
		||||
  unsigned long payloadSize = 0;
 | 
			
		||||
  unsigned long long numSamples = 0;
 | 
			
		||||
  bool invalidargs = false;
 | 
			
		||||
  dds_time_t timeOut = 0;
 | 
			
		||||
  dds_time_t startTime;
 | 
			
		||||
  dds_time_t time;
 | 
			
		||||
  dds_time_t preWriteTime;
 | 
			
		||||
  dds_time_t postWriteTime;
 | 
			
		||||
  dds_time_t preTakeTime;
 | 
			
		||||
  dds_time_t postTakeTime;
 | 
			
		||||
  dds_time_t difference = 0;
 | 
			
		||||
  dds_time_t elapsed = 0;
 | 
			
		||||
 | 
			
		||||
  RoundTripModule_DataType pub_data;
 | 
			
		||||
  RoundTripModule_DataType sub_data[MAX_SAMPLES];
 | 
			
		||||
  void *samples[MAX_SAMPLES];
 | 
			
		||||
  dds_sample_info_t info[MAX_SAMPLES];
 | 
			
		||||
 | 
			
		||||
  dds_attach_t wsresults[1];
 | 
			
		||||
  size_t wsresultsize = 1U;
 | 
			
		||||
  dds_time_t waitTimeout = DDS_SECS (1);
 | 
			
		||||
  unsigned long i;
 | 
			
		||||
  int status;
 | 
			
		||||
  bool warmUp = true;
 | 
			
		||||
 | 
			
		||||
  dds_listener_t *listener = NULL;
 | 
			
		||||
  bool use_listener = false;
 | 
			
		||||
  int argidx = 1;
 | 
			
		||||
 | 
			
		||||
  /* poor man's getopt works even on Windows */
 | 
			
		||||
  if (argc > argidx && strcmp(argv[argidx], "-l") == 0)
 | 
			
		||||
  {
 | 
			
		||||
    argidx++;
 | 
			
		||||
    use_listener = true;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /* Register handler for Ctrl-C */
 | 
			
		||||
#ifdef _WIN32
 | 
			
		||||
| 
						 | 
				
			
			@ -182,11 +268,17 @@ int main (int argc, char *argv[])
 | 
			
		|||
    samples[i] = &sub_data[i];
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  participant = prepare_dds(&writer, &reader, &readCond);
 | 
			
		||||
  participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
 | 
			
		||||
  DDS_ERR_CHECK (participant, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
 | 
			
		||||
  listener = dds_listener_create(NULL);
 | 
			
		||||
  dds_lset_data_available(listener, data_available);
 | 
			
		||||
 | 
			
		||||
  prepare_dds(&writer, &reader, &readCond, listener);
 | 
			
		||||
 | 
			
		||||
  setvbuf(stdout, NULL, _IONBF, 0);
 | 
			
		||||
 | 
			
		||||
  if (argc == 2 && strcmp (argv[1], "quit") == 0)
 | 
			
		||||
  if (argc - argidx == 1 && strcmp (argv[argidx], "quit") == 0)
 | 
			
		||||
  {
 | 
			
		||||
    printf ("Sending termination request.\n");
 | 
			
		||||
    /* pong uses a waitset which is triggered by instance disposal, and
 | 
			
		||||
| 
						 | 
				
			
			@ -202,36 +294,29 @@ int main (int argc, char *argv[])
 | 
			
		|||
    goto done;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  if (argc == 1)
 | 
			
		||||
  if (argc - argidx == 0)
 | 
			
		||||
  {
 | 
			
		||||
    invalidargs = true;
 | 
			
		||||
  }
 | 
			
		||||
  if (argc >= 2)
 | 
			
		||||
  if (argc - argidx >= 1)
 | 
			
		||||
  {
 | 
			
		||||
    payloadSize = atol (argv[1]);
 | 
			
		||||
    payloadSize = atol (argv[argidx]);
 | 
			
		||||
 | 
			
		||||
    if (payloadSize > 65536)
 | 
			
		||||
    if (payloadSize > 100 * 1048576)
 | 
			
		||||
    {
 | 
			
		||||
      invalidargs = true;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  if (argc >= 3)
 | 
			
		||||
  if (argc - argidx >= 2)
 | 
			
		||||
  {
 | 
			
		||||
    numSamples = atol (argv[2]);
 | 
			
		||||
    numSamples = atol (argv[argidx+1]);
 | 
			
		||||
  }
 | 
			
		||||
  if (argc >= 4)
 | 
			
		||||
  if (argc - argidx >= 3)
 | 
			
		||||
  {
 | 
			
		||||
    timeOut = atol (argv[3]);
 | 
			
		||||
  }
 | 
			
		||||
  if (invalidargs || (argc == 2 && (strcmp (argv[1], "-h") == 0 || strcmp (argv[1], "--help") == 0)))
 | 
			
		||||
  {
 | 
			
		||||
    printf ("Usage (parameters must be supplied in order):\n"
 | 
			
		||||
            "./ping [payloadSize (bytes, 0 - 65536)] [numSamples (0 = infinite)] [timeOut (seconds, 0 = infinite)]\n"
 | 
			
		||||
            "./ping quit - ping sends a quit signal to pong.\n"
 | 
			
		||||
            "Defaults:\n"
 | 
			
		||||
            "./ping 0 0 0\n");
 | 
			
		||||
    return EXIT_FAILURE;
 | 
			
		||||
    timeOut = atol (argv[argidx+2]);
 | 
			
		||||
  }
 | 
			
		||||
  if (invalidargs || (argc - argidx == 1 && (strcmp (argv[argidx], "-h") == 0 || strcmp (argv[argidx], "--help") == 0)))
 | 
			
		||||
    usage();
 | 
			
		||||
  printf ("# payloadSize: %lu | numSamples: %llu | timeOut: %" PRIi64 "\n\n", payloadSize, numSamples, timeOut);
 | 
			
		||||
 | 
			
		||||
  pub_data.payload._length = payloadSize;
 | 
			
		||||
| 
						 | 
				
			
			@ -245,13 +330,12 @@ int main (int argc, char *argv[])
 | 
			
		|||
 | 
			
		||||
  startTime = dds_time ();
 | 
			
		||||
  printf ("# Waiting for startup jitter to stabilise\n");
 | 
			
		||||
  /* Write a sample that pong can send back */
 | 
			
		||||
  while (!dds_triggered (waitSet) && difference < DDS_SECS(5))
 | 
			
		||||
  {
 | 
			
		||||
    status = dds_write (writer, &pub_data);
 | 
			
		||||
    DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
    status = dds_waitset_wait (waitSet, wsresults, wsresultsize, waitTimeout);
 | 
			
		||||
    DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
    if (status > 0) /* data */
 | 
			
		||||
    if (status > 0 && listener == NULL) /* data */
 | 
			
		||||
    {
 | 
			
		||||
      status = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES);
 | 
			
		||||
      DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
| 
						 | 
				
			
			@ -266,93 +350,26 @@ int main (int argc, char *argv[])
 | 
			
		|||
    printf("# Warm up complete.\n\n");
 | 
			
		||||
 | 
			
		||||
    printf("# Round trip measurements (in us)\n");
 | 
			
		||||
    printf("#             Round trip time [us]         Write-access time [us]       Read-access time [us]\n");
 | 
			
		||||
    printf("# Seconds     Count   median      min      Count   median      min      Count   median      min\n");
 | 
			
		||||
    printf("#             Round trip time [us]                           Write-access time [us]       Read-access time [us]\n");
 | 
			
		||||
    printf("# Seconds     Count   median      min      99%%      max      Count   median      min      Count   median      min\n");
 | 
			
		||||
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  exampleResetTimeStats (&roundTrip);
 | 
			
		||||
  exampleResetTimeStats (&writeAccess);
 | 
			
		||||
  exampleResetTimeStats (&readAccess);
 | 
			
		||||
  startTime = dds_time ();
 | 
			
		||||
  /* Write a sample that pong can send back */
 | 
			
		||||
  preWriteTime = dds_time ();
 | 
			
		||||
  status = dds_write_ts (writer, &pub_data, preWriteTime);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  postWriteTime = dds_time ();
 | 
			
		||||
  for (i = 0; !dds_triggered (waitSet) && (!numSamples || i < numSamples); i++)
 | 
			
		||||
  {
 | 
			
		||||
    /* Write a sample that pong can send back */
 | 
			
		||||
    preWriteTime = dds_time ();
 | 
			
		||||
    status = dds_write (writer, &pub_data);
 | 
			
		||||
    DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
    postWriteTime = dds_time ();
 | 
			
		||||
 | 
			
		||||
    /* Wait for response from pong */
 | 
			
		||||
    status = dds_waitset_wait (waitSet, wsresults, wsresultsize, waitTimeout);
 | 
			
		||||
    DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
    if (status != 0)
 | 
			
		||||
    {
 | 
			
		||||
      /* Take sample and check that it is valid */
 | 
			
		||||
      preTakeTime = dds_time ();
 | 
			
		||||
      status = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES);
 | 
			
		||||
      DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
      postTakeTime = dds_time ();
 | 
			
		||||
 | 
			
		||||
      if (!dds_triggered (waitSet))
 | 
			
		||||
      {
 | 
			
		||||
        if (status != 1)
 | 
			
		||||
        {
 | 
			
		||||
          fprintf (stdout, "%s%d%s", "ERROR: Ping received ", status,
 | 
			
		||||
                  " samples but was expecting 1. Are multiple pong applications running?\n");
 | 
			
		||||
 | 
			
		||||
          goto done;
 | 
			
		||||
        }
 | 
			
		||||
        else if (!info[0].valid_data)
 | 
			
		||||
        {
 | 
			
		||||
          printf ("ERROR: Ping received an invalid sample. Has pong terminated already?\n");
 | 
			
		||||
          goto done;
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      /* Update stats */
 | 
			
		||||
      difference = (postWriteTime - preWriteTime)/DDS_NSECS_IN_USEC;
 | 
			
		||||
      writeAccess = *exampleAddTimingToTimeStats (&writeAccess, difference);
 | 
			
		||||
      writeAccessOverall = *exampleAddTimingToTimeStats (&writeAccessOverall, difference);
 | 
			
		||||
 | 
			
		||||
      difference = (postTakeTime - preTakeTime)/DDS_NSECS_IN_USEC;
 | 
			
		||||
      readAccess = *exampleAddTimingToTimeStats (&readAccess, difference);
 | 
			
		||||
      readAccessOverall = *exampleAddTimingToTimeStats (&readAccessOverall, difference);
 | 
			
		||||
 | 
			
		||||
      difference = (postTakeTime - preWriteTime)/DDS_NSECS_IN_USEC;
 | 
			
		||||
      roundTrip = *exampleAddTimingToTimeStats (&roundTrip, difference);
 | 
			
		||||
      roundTripOverall = *exampleAddTimingToTimeStats (&roundTripOverall, difference);
 | 
			
		||||
 | 
			
		||||
      /* Print stats each second */
 | 
			
		||||
      difference = (postTakeTime - startTime)/DDS_NSECS_IN_USEC;
 | 
			
		||||
      if (difference > US_IN_ONE_SEC || (i && i == numSamples))
 | 
			
		||||
      {
 | 
			
		||||
        printf
 | 
			
		||||
        (
 | 
			
		||||
          "%9" PRIi64 " %9lu %8.0f %8" PRIi64 " %10lu %8.0f %8" PRIi64 " %10lu %8.0f %8" PRIi64 "\n",
 | 
			
		||||
          elapsed + 1,
 | 
			
		||||
          roundTrip.count,
 | 
			
		||||
          exampleGetMedianFromTimeStats (&roundTrip),
 | 
			
		||||
          roundTrip.min,
 | 
			
		||||
          writeAccess.count,
 | 
			
		||||
          exampleGetMedianFromTimeStats (&writeAccess),
 | 
			
		||||
          writeAccess.min,
 | 
			
		||||
          readAccess.count,
 | 
			
		||||
          exampleGetMedianFromTimeStats (&readAccess),
 | 
			
		||||
          readAccess.min
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        exampleResetTimeStats (&roundTrip);
 | 
			
		||||
        exampleResetTimeStats (&writeAccess);
 | 
			
		||||
        exampleResetTimeStats (&readAccess);
 | 
			
		||||
        startTime = dds_time ();
 | 
			
		||||
        elapsed++;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
    {
 | 
			
		||||
      elapsed += waitTimeout / DDS_NSECS_IN_SEC;
 | 
			
		||||
    }
 | 
			
		||||
    if (timeOut && elapsed == timeOut)
 | 
			
		||||
    {
 | 
			
		||||
      dds_waitset_set_trigger (waitSet, true);
 | 
			
		||||
    if (status != 0 && listener == NULL) {
 | 
			
		||||
      data_available(reader, NULL);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -401,7 +418,7 @@ done:
 | 
			
		|||
  return EXIT_SUCCESS;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond)
 | 
			
		||||
static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond, dds_listener_t *listener)
 | 
			
		||||
{
 | 
			
		||||
  dds_return_t status;
 | 
			
		||||
  dds_entity_t topic;
 | 
			
		||||
| 
						 | 
				
			
			@ -415,9 +432,6 @@ static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_
 | 
			
		|||
  dds_qos_t *drQos;
 | 
			
		||||
  dds_qos_t *subQos;
 | 
			
		||||
 | 
			
		||||
  dds_entity_t participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
 | 
			
		||||
  DDS_ERR_CHECK (participant, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
 | 
			
		||||
  /* A DDS_Topic is created for our sample type on the domain participant. */
 | 
			
		||||
  topic = dds_create_topic (participant, &RoundTripModule_DataType_desc, "RoundTrip", NULL, NULL);
 | 
			
		||||
  DDS_ERR_CHECK (topic, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
| 
						 | 
				
			
			@ -449,15 +463,18 @@ static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_
 | 
			
		|||
  /* A DDS_DataReader is created on the Subscriber & Topic with a modified QoS. */
 | 
			
		||||
  drQos = dds_qos_create ();
 | 
			
		||||
  dds_qset_reliability (drQos, DDS_RELIABILITY_RELIABLE, DDS_SECS(10));
 | 
			
		||||
  *reader = dds_create_reader (subscriber, topic, drQos, NULL);
 | 
			
		||||
  *reader = dds_create_reader (subscriber, topic, drQos, listener);
 | 
			
		||||
  DDS_ERR_CHECK (*reader, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  dds_qos_delete (drQos);
 | 
			
		||||
 | 
			
		||||
  waitSet = dds_create_waitset (participant);
 | 
			
		||||
  *readCond = dds_create_readcondition (*reader, DDS_ANY_STATE);
 | 
			
		||||
 | 
			
		||||
  status = dds_waitset_attach (waitSet, *readCond, *reader);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  if (listener == NULL) {
 | 
			
		||||
    *readCond = dds_create_readcondition (*reader, DDS_ANY_STATE);
 | 
			
		||||
    status = dds_waitset_attach (waitSet, *readCond, *reader);
 | 
			
		||||
    DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  } else {
 | 
			
		||||
    *readCond = 0;
 | 
			
		||||
  }
 | 
			
		||||
  status = dds_waitset_attach (waitSet, waitSet, waitSet);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -471,12 +488,10 @@ static void finalize_dds(dds_entity_t participant, dds_entity_t reader, dds_enti
 | 
			
		|||
  /* Disable callbacks */
 | 
			
		||||
  dds_set_enabled_status (reader, 0);
 | 
			
		||||
 | 
			
		||||
  status = dds_waitset_detach (waitSet, readCond);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  (void) dds_waitset_detach (waitSet, readCond);
 | 
			
		||||
  status = dds_waitset_detach (waitSet, waitSet);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  status = dds_delete (readCond);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  (void) dds_delete (readCond);
 | 
			
		||||
  status = dds_delete (waitSet);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  status = dds_delete (participant);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -10,7 +10,7 @@ static dds_entity_t waitSet;
 | 
			
		|||
#define MAX_SAMPLES 10
 | 
			
		||||
 | 
			
		||||
/* Forward declarations */
 | 
			
		||||
static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond);
 | 
			
		||||
static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond, dds_listener_t *listener);
 | 
			
		||||
static void finalize_dds(dds_entity_t participant, dds_entity_t readCond, RoundTripModule_DataType data[MAX_SAMPLES]);
 | 
			
		||||
 | 
			
		||||
#ifdef _WIN32
 | 
			
		||||
| 
						 | 
				
			
			@ -27,21 +27,58 @@ static void CtrlHandler (int fdwCtrlType)
 | 
			
		|||
}
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
static RoundTripModule_DataType data[MAX_SAMPLES];
 | 
			
		||||
static void * samples[MAX_SAMPLES];
 | 
			
		||||
static dds_sample_info_t info[MAX_SAMPLES];
 | 
			
		||||
 | 
			
		||||
static dds_entity_t participant;
 | 
			
		||||
static dds_entity_t reader;
 | 
			
		||||
static dds_entity_t writer;
 | 
			
		||||
static dds_entity_t readCond;
 | 
			
		||||
 | 
			
		||||
static void data_available(dds_entity_t reader, void *arg)
 | 
			
		||||
{
 | 
			
		||||
  int status, samplecount;
 | 
			
		||||
  (void)arg;
 | 
			
		||||
  samplecount = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES);
 | 
			
		||||
  DDS_ERR_CHECK (samplecount, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  for (int j = 0; !dds_triggered (waitSet) && j < samplecount; j++)
 | 
			
		||||
  {
 | 
			
		||||
    /* If writer has been disposed terminate pong */
 | 
			
		||||
 | 
			
		||||
    if (info[j].instance_state == DDS_IST_NOT_ALIVE_DISPOSED)
 | 
			
		||||
    {
 | 
			
		||||
      printf ("Received termination request. Terminating.\n");
 | 
			
		||||
      dds_waitset_set_trigger (waitSet, true);
 | 
			
		||||
      break;
 | 
			
		||||
    }
 | 
			
		||||
    else if (info[j].valid_data)
 | 
			
		||||
    {
 | 
			
		||||
      /* If sample is valid, send it back to ping */
 | 
			
		||||
      RoundTripModule_DataType * valid_sample = &data[j];
 | 
			
		||||
      status = dds_write_ts (writer, valid_sample, info[j].source_timestamp);
 | 
			
		||||
      DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int main (int argc, char *argv[])
 | 
			
		||||
{
 | 
			
		||||
  dds_duration_t waitTimeout = DDS_INFINITY;
 | 
			
		||||
  unsigned int i;
 | 
			
		||||
  int status, samplecount, j;
 | 
			
		||||
  int status;
 | 
			
		||||
  dds_attach_t wsresults[1];
 | 
			
		||||
  size_t wsresultsize = 1U;
 | 
			
		||||
  dds_entity_t participant;
 | 
			
		||||
  dds_entity_t reader;
 | 
			
		||||
  dds_entity_t writer;
 | 
			
		||||
  dds_entity_t readCond;
 | 
			
		||||
 | 
			
		||||
  RoundTripModule_DataType data[MAX_SAMPLES];
 | 
			
		||||
  void * samples[MAX_SAMPLES];
 | 
			
		||||
  dds_sample_info_t info[MAX_SAMPLES];
 | 
			
		||||
  dds_listener_t *listener = NULL;
 | 
			
		||||
  bool use_listener = false;
 | 
			
		||||
  int argidx = 1;
 | 
			
		||||
 | 
			
		||||
  if (argc > argidx && strcmp(argv[argidx], "-l") == 0)
 | 
			
		||||
  {
 | 
			
		||||
    argidx++;
 | 
			
		||||
    use_listener = true;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /* Register handler for Ctrl-C */
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -55,8 +92,6 @@ int main (int argc, char *argv[])
 | 
			
		|||
  sigaction (SIGINT, &sat, &oldAction);
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
  participant = prepare_dds(&writer, &reader, &readCond);
 | 
			
		||||
 | 
			
		||||
  /* Initialize sample data */
 | 
			
		||||
  memset (data, 0, sizeof (data));
 | 
			
		||||
  for (i = 0; i < MAX_SAMPLES; i++)
 | 
			
		||||
| 
						 | 
				
			
			@ -64,6 +99,17 @@ int main (int argc, char *argv[])
 | 
			
		|||
    samples[i] = &data[i];
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
 | 
			
		||||
  DDS_ERR_CHECK (participant, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
 | 
			
		||||
  if (use_listener)
 | 
			
		||||
  {
 | 
			
		||||
    listener = dds_listener_create(NULL);
 | 
			
		||||
    dds_lset_data_available(listener, data_available);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  (void)prepare_dds(&writer, &reader, &readCond, listener);
 | 
			
		||||
 | 
			
		||||
  while (!dds_triggered (waitSet))
 | 
			
		||||
  {
 | 
			
		||||
    /* Wait for a sample from ping */
 | 
			
		||||
| 
						 | 
				
			
			@ -72,25 +118,8 @@ int main (int argc, char *argv[])
 | 
			
		|||
    DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
 | 
			
		||||
    /* Take samples */
 | 
			
		||||
    samplecount = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES);
 | 
			
		||||
    DDS_ERR_CHECK (samplecount, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
    for (j = 0; !dds_triggered (waitSet) && j < samplecount; j++)
 | 
			
		||||
    {
 | 
			
		||||
      /* If writer has been disposed terminate pong */
 | 
			
		||||
 | 
			
		||||
      if (info[j].instance_state == DDS_IST_NOT_ALIVE_DISPOSED)
 | 
			
		||||
      {
 | 
			
		||||
        printf ("Received termination request. Terminating.\n");
 | 
			
		||||
        dds_waitset_set_trigger (waitSet, true);
 | 
			
		||||
        break;
 | 
			
		||||
      }
 | 
			
		||||
      else if (info[j].valid_data)
 | 
			
		||||
      {
 | 
			
		||||
        /* If sample is valid, send it back to ping */
 | 
			
		||||
        RoundTripModule_DataType * valid_sample = &data[j];
 | 
			
		||||
        status = dds_write (writer, valid_sample);
 | 
			
		||||
        DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
      }
 | 
			
		||||
    if (listener == NULL) {
 | 
			
		||||
      data_available (reader, 0);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -108,12 +137,11 @@ int main (int argc, char *argv[])
 | 
			
		|||
 | 
			
		||||
static void finalize_dds(dds_entity_t participant, dds_entity_t readCond, RoundTripModule_DataType data[MAX_SAMPLES])
 | 
			
		||||
{
 | 
			
		||||
  dds_return_t status = dds_waitset_detach (waitSet, readCond);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  dds_return_t status;
 | 
			
		||||
  (void)dds_waitset_detach (waitSet, readCond);
 | 
			
		||||
  status = dds_waitset_detach (waitSet, waitSet);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  status = dds_delete (readCond);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  (void)dds_delete (readCond);
 | 
			
		||||
  status = dds_delete (waitSet);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  status = dds_delete (participant);
 | 
			
		||||
| 
						 | 
				
			
			@ -125,7 +153,7 @@ static void finalize_dds(dds_entity_t participant, dds_entity_t readCond, RoundT
 | 
			
		|||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond)
 | 
			
		||||
static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond, dds_listener_t *listener)
 | 
			
		||||
{
 | 
			
		||||
  const char *pubPartitions[] = { "pong" };
 | 
			
		||||
  const char *subPartitions[] = { "ping" };
 | 
			
		||||
| 
						 | 
				
			
			@ -135,9 +163,6 @@ static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_
 | 
			
		|||
  dds_entity_t topic;
 | 
			
		||||
  dds_return_t status;
 | 
			
		||||
 | 
			
		||||
  dds_entity_t participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
 | 
			
		||||
  DDS_ERR_CHECK (participant, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
 | 
			
		||||
  /* A DDS Topic is created for our sample type on the domain participant. */
 | 
			
		||||
 | 
			
		||||
  topic = dds_create_topic (participant, &RoundTripModule_DataType_desc, "RoundTrip", NULL, NULL);
 | 
			
		||||
| 
						 | 
				
			
			@ -174,14 +199,18 @@ static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_
 | 
			
		|||
 | 
			
		||||
  qos = dds_qos_create ();
 | 
			
		||||
  dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(10));
 | 
			
		||||
  *reader = dds_create_reader (subscriber, topic, qos, NULL);
 | 
			
		||||
  *reader = dds_create_reader (subscriber, topic, qos, listener);
 | 
			
		||||
  DDS_ERR_CHECK (*reader, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  dds_qos_delete (qos);
 | 
			
		||||
 | 
			
		||||
  waitSet = dds_create_waitset (participant);
 | 
			
		||||
  *readCond = dds_create_readcondition (*reader, DDS_ANY_STATE);
 | 
			
		||||
  status = dds_waitset_attach (waitSet, *readCond, *reader);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  if (listener == NULL) {
 | 
			
		||||
    *readCond = dds_create_readcondition (*reader, DDS_ANY_STATE);
 | 
			
		||||
    status = dds_waitset_attach (waitSet, *readCond, *reader);
 | 
			
		||||
    DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
  } else {
 | 
			
		||||
    *readCond = 0;
 | 
			
		||||
  }
 | 
			
		||||
  status = dds_waitset_attach (waitSet, waitSet, waitSet);
 | 
			
		||||
  DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue