Initial contribution

This commit is contained in:
Michiel Beemster 2018-04-10 17:03:59 +02:00
parent 7b5cc4fa59
commit 11d9ce37aa
580 changed files with 155133 additions and 162 deletions

View file

@ -0,0 +1,33 @@
#
# Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v. 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
# v. 1.0 which is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
#
cmake_minimum_required(VERSION 3.5)
if (NOT TARGET CycloneDDS::ddsc)
# Find the CycloneDDS package. If it is not in a default location, try
# finding it relative to the example where it most likely resides.
find_package(CycloneDDS REQUIRED PATHS "${CMAKE_SOURCE_DIR}/../../")
endif()
# This is a convenience function, provided by the CycloneDDS package,
# that will supply a library target related the the given idl file.
# In short, it takes the idl file, generates the source files with
# the proper data types and compiles them into a library.
idlc_generate(Throughput_lib Throughput.idl)
# Both executables have only one related source file.
add_executable(ThroughputPublisher publisher.c)
add_executable(ThroughputSubscriber subscriber.c)
# Both executables need to be linked to the idl data type library and
# the ddsc API library.
target_link_libraries(ThroughputPublisher Throughput_lib CycloneDDS::ddsc)
target_link_libraries(ThroughputSubscriber Throughput_lib CycloneDDS::ddsc)

View file

@ -0,0 +1,9 @@
module ThroughputModule
{
struct DataType
{
unsigned long long count;
sequence<octet> payload;
};
#pragma keylist DataType
};

View file

@ -0,0 +1,309 @@
#include "ddsc/dds.h"
#include "Throughput.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
/*
* The Throughput example measures data throughput in bytes per second. The publisher
* allows you to specify a payload size in bytes as well as allowing you to specify
* whether to send data in bursts. The publisher will continue to send data forever
* unless a time out is specified. The subscriber will receive data and output the
* total amount received and the data rate in bytes per second. It will also indicate
* if any samples were received out of order. A maximum number of cycles can be
* specified and once this has been reached the subscriber will terminate and output
* totals and averages.
*/
#define MAX_SAMPLES 100
static bool done = false;
/* Forward declarations */
static dds_return_t wait_for_reader(dds_entity_t writer, dds_entity_t participant);
static void start_writing(dds_entity_t writer, ThroughputModule_DataType *sample,
unsigned int burstInterval, unsigned int burstSize, unsigned int timeOut);
static int parse_args(int argc, char **argv, uint32_t *payloadSize, unsigned int *burstInterval,
unsigned int *burstSize, unsigned int *timeOut, char **partitionName);
static dds_entity_t prepare_dds(dds_entity_t *writer, const char *partitionName);
static void finalize_dds(dds_entity_t participant, dds_entity_t writer, ThroughputModule_DataType sample);
/* Functions to handle Ctrl-C presses. */
#ifdef _WIN32
#include <Windows.h>
static int CtrlHandler (DWORD fdwCtrlType)
{
done = true;
return true; /* Don't let other handlers handle this key */
}
#else
struct sigaction oldAction;
static void CtrlHandler (int fdwCtrlType)
{
done = true;
}
#endif
int main (int argc, char **argv)
{
int result = EXIT_SUCCESS;
uint32_t payloadSize = 8192;
unsigned int burstInterval = 0;
unsigned int burstSize = 1;
unsigned int timeOut = 0;
char * partitionName = "Throughput example";
dds_entity_t participant;
dds_entity_t writer;
ThroughputModule_DataType sample;
/* Register handler for Ctrl-C */
#ifdef _WIN32
SetConsoleCtrlHandler ((PHANDLER_ROUTINE) CtrlHandler, true);
#else
struct sigaction sat;
sat.sa_handler = CtrlHandler;
sigemptyset (&sat.sa_mask);
sat.sa_flags = 0;
sigaction (SIGINT, &sat, &oldAction);
#endif
if (parse_args(argc, argv, &payloadSize, &burstInterval, &burstSize, &timeOut, &partitionName) == EXIT_FAILURE) {
return EXIT_FAILURE;
}
participant = prepare_dds(&writer, partitionName);
/* Wait until have a reader */
if (wait_for_reader(writer, participant) == 0) {
printf ("=== [Publisher] Did not discover a reader.\n");
DDS_ERR_CHECK (dds_delete (participant), DDS_CHECK_REPORT | DDS_CHECK_EXIT);
return EXIT_FAILURE;
}
/* Fill the sample payload with data */
sample.count = 0;
sample.payload._buffer = dds_alloc (payloadSize);
sample.payload._length = payloadSize;
sample.payload._release = true;
for (uint32_t i = 0; i < payloadSize; i++) {
sample.payload._buffer[i] = 'a';
}
/* Register the sample instance and write samples repeatedly or until time out */
start_writing(writer, &sample, burstInterval, burstSize, timeOut);
#ifdef _WIN32
SetConsoleCtrlHandler (0, false);
#else
sigaction (SIGINT, &oldAction, 0);
#endif
/* Cleanup */
finalize_dds(participant, writer, sample);
}
static int parse_args(
int argc,
char **argv,
uint32_t *payloadSize,
unsigned int *burstInterval,
unsigned int *burstSize,
unsigned int *timeOut,
char **partitionName)
{
int result = EXIT_SUCCESS;
/*
* Get the program parameters
* Parameters: publisher [payloadSize] [burstInterval] [burstSize] [timeOut] [partitionName]
*/
if (argc == 2 && (strcmp (argv[1], "-h") == 0 || strcmp (argv[1], "--help") == 0))
{
printf ("Usage (parameters must be supplied in order):\n");
printf ("./publisher [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]\n");
printf ("Defaults:\n");
printf ("./publisher 8192 0 1 0 \"Throughput example\"\n");
return EXIT_FAILURE;
}
if (argc > 1)
{
*payloadSize = atoi (argv[1]); /* The size of the payload in bytes */
}
if (argc > 2)
{
*burstInterval = atoi (argv[2]); /* The time interval between each burst in ms */
}
if (argc > 3)
{
*burstSize = atoi (argv[3]); /* The number of samples to send each burst */
}
if (argc > 4)
{
*timeOut = atoi (argv[4]); /* The number of seconds the publisher should run for (0 = infinite) */
}
if (argc > 5)
{
*partitionName = argv[5]; /* The name of the partition */
}
printf ("payloadSize: %u bytes burstInterval: %u ms burstSize: %u timeOut: %u seconds partitionName: %s\n",
*payloadSize, *burstInterval, *burstSize, *timeOut, *partitionName);
return result;
}
static dds_entity_t prepare_dds(dds_entity_t *writer, const char *partitionName)
{
dds_entity_t participant;
dds_entity_t topic;
dds_entity_t publisher;
const char *pubParts[1];
dds_qos_t *pubQos;
dds_qos_t *dwQos;
/* A domain participant is created for the default domain. */
participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
DDS_ERR_CHECK (participant, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
/* A topic is created for our sample type on the domain participant. */
topic = dds_create_topic (participant, &ThroughputModule_DataType_desc, "Throughput", NULL, NULL);
DDS_ERR_CHECK (topic, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
/* A publisher is created on the domain participant. */
pubQos = dds_qos_create ();
pubParts[0] = partitionName;
dds_qset_partition (pubQos, 1, pubParts);
publisher = dds_create_publisher (participant, pubQos, NULL);
DDS_ERR_CHECK (publisher, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
dds_qos_delete (pubQos);
/* A DataWriter is created on the publisher. */
dwQos = dds_qos_create ();
dds_qset_reliability (dwQos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10));
dds_qset_history (dwQos, DDS_HISTORY_KEEP_ALL, 0);
dds_qset_resource_limits (dwQos, MAX_SAMPLES, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED);
*writer = dds_create_writer (publisher, topic, dwQos, NULL);
DDS_ERR_CHECK (*writer, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
dds_qos_delete (dwQos);
/* Enable write batching */
dds_write_set_batch (true);
return participant;
}
static dds_return_t wait_for_reader(dds_entity_t writer, dds_entity_t participant)
{
printf ("\n=== [Publisher] Waiting for a reader ...\n");
dds_return_t ret;
dds_entity_t waitset;
ret = dds_set_enabled_status(writer, DDS_PUBLICATION_MATCHED_STATUS);
DDS_ERR_CHECK (ret, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
waitset = dds_create_waitset(participant);
DDS_ERR_CHECK (waitset, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
ret = dds_waitset_attach(waitset, writer, (dds_attach_t)NULL);
DDS_ERR_CHECK (waitset, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
ret = dds_waitset_wait(waitset, NULL, 0, DDS_SECS(30));
DDS_ERR_CHECK (ret, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
return ret;
}
static void start_writing(
dds_entity_t writer,
ThroughputModule_DataType *sample,
unsigned int burstInterval,
unsigned int burstSize,
unsigned int timeOut)
{
bool timedOut = false;
dds_time_t pubStart = dds_time ();
dds_time_t now;
dds_time_t deltaTv;
dds_return_t status;
if (!done)
{
dds_time_t burstStart = pubStart;
unsigned int burstCount = 0;
printf ("=== [Publisher] Writing samples...\n");
while (!done && !timedOut)
{
/* Write data until burst size has been reached */
if (burstCount < burstSize)
{
status = dds_write (writer, sample);
if (dds_err_nr(status) == DDS_RETCODE_TIMEOUT)
{
timedOut = true;
}
else
{
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
sample->count++;
burstCount++;
}
}
else if (burstInterval)
{
/* Sleep until burst interval has passed */
dds_time_t time = dds_time ();
deltaTv = time - burstStart;
if (deltaTv < DDS_MSECS (burstInterval))
{
dds_write_flush (writer);
dds_sleepfor (DDS_MSECS (burstInterval) - deltaTv);
}
burstStart = dds_time ();
burstCount = 0;
}
else
{
burstCount = 0;
}
if (timeOut)
{
now = dds_time ();
deltaTv = now - pubStart;
if ((deltaTv) > DDS_SECS (timeOut))
{
timedOut = true;
}
}
}
dds_write_flush (writer);
if (done)
{
printf ("=== [Publisher] Terminated, %llu samples written.\n", (unsigned long long) sample->count);
}
else
{
printf ("=== [Publisher] Timed out, %llu samples written.\n", (unsigned long long) sample->count);
}
}
}
static void finalize_dds(dds_entity_t participant, dds_entity_t writer, ThroughputModule_DataType sample)
{
dds_return_t status = dds_dispose (writer, &sample);
if (dds_err_nr (status) != DDS_RETCODE_TIMEOUT)
{
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
}
dds_free (sample.payload._buffer);
status = dds_delete (participant);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
}

View file

@ -0,0 +1,99 @@
..
Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
This program and the accompanying materials are made available under the
terms of the Eclipse Public License v. 2.0 which is available at
http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
v. 1.0 which is available at
http://www.eclipse.org/org/documents/edl-v10.php.
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
Throughput
==========
Description
***********
The Throughput example allows the measurement of data throughput when receiving samples from a publisher.
Design
******
It consists of 2 units:
- Publisher: sends samples at a specified size and rate.
- Subscriber: Receives samples and outputs statistics about throughput
Scenario
********
The **publisher** sends samples and allows you to specify a payload size in bytes as well as allowing you to specify whether
to send data in bursts. The **publisher** will continue to send data forever unless a time-out is specified.
Configurable:
- payloadSize: the size of the payload in bytes
- burstInterval: the time interval between each burst in ms
- burstSize: the number of samples to send each burst
- timeOut: the number of seconds the publisher should run for (0=infinite)
- partitionName: the name of the partition
The **subscriber** will receive data and output the total amount received and the data-rate in bytes-per-second. It will
also indicate if any samples were received out-of-order. A maximum number of cycles can be specified and once this has
been reached the subscriber will terminate and output totals and averages.
The **subscriber** executable measures:
- transferred: the total amount of data transferred in bytes.
- outOfOrder: the number of samples that were received out of order.
- transfer rate: the data transfer rate in bytes per second.
- subscriber also calculates statistics on these values over a configurable number of cycles.
Configurable:
- maxCycles: the number of times to output statistics before terminating
- pollingDelay
- partitionName: the name of the partition
Running the example
*******************
It is recommended that you run ping and pong in separate terminals to avoid mixing the output.
- Open 2 terminals.
- In the first terminal start Publisher by running publisher
publisher usage (parameters must be supplied in order):
``./publisher [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]``
defaults:
``./publisher 8192 0 1 0 "Throughput example"``
- In the second terminal start Ping by running subscriber
subscriber usage (parameters must be supplied in order):
``./subscriber [maxCycles (0=infinite)] [pollingDelay (ms, 0 = event based)] [partitionName]``
defaults:
``./subscriber 0 0 "Throughput example"``
- To achieve optimal performance it is recommended to set the CPU affinity so that ping and pong run on separate CPU cores,
and use real-time scheduling. In a Linux environment this can be achieved as follows:
publisher usage:
``taskset -c 0 chrt -f 80 ./publisher [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]``
subscriber usage:
``taskset -c 1 chrt -f 80 ./subscriber [maxCycles (0 = infinite)] [pollingDelay (ms, 0 = event based)] [partitionName]``
On Windows the CPU affinity and prioritized scheduling class can be set as follows:
publisher usage:
``START /affinity 1 /high cmd /k "publisher.exe" [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]``
subscriber usage:
``START /affinity 2 /high cmd /k "subscriber.exe" [maxCycles (0 = infinite)] [pollingDelay (ms, 0 = event based)] [partitionName]``

View file

@ -0,0 +1,426 @@
#include "ddsc/dds.h"
#include "Throughput.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <assert.h>
/*
* The Throughput example measures data throughput in bytes per second. The publisher
* allows you to specify a payload size in bytes as well as allowing you to specify
* whether to send data in bursts. The publisher will continue to send data forever
* unless a time out is specified. The subscriber will receive data and output the
* total amount received and the data rate in bytes per second. It will also indicate
* if any samples were received out of order. A maximum number of cycles can be
* specified and once this has been reached the subscriber will terminate and output
* totals and averages.
*/
#define BYTES_PER_SEC_TO_MEGABITS_PER_SEC 125000
#define MAX_SAMPLES 100
typedef struct HandleEntry
{
dds_instance_handle_t handle;
unsigned long long count;
struct HandleEntry * next;
} HandleEntry;
typedef struct HandleMap
{
HandleEntry *entries;
} HandleMap;
static unsigned long pollingDelay = 0;
static HandleMap * imap;
static unsigned long long outOfOrder = 0;
static unsigned long long total_bytes = 0;
static unsigned long long total_samples = 0;
static dds_time_t startTime = 0;
static dds_time_t time_now = 0;
static dds_time_t prev_time = 0;
static unsigned long payloadSize = 0;
static ThroughputModule_DataType data [MAX_SAMPLES];
static void * samples[MAX_SAMPLES];
static dds_entity_t waitSet;
static dds_entity_t pollingWaitset;
static bool done = false;
/* Forward declarations */
static HandleMap * HandleMap__alloc (void);
static void HandleMap__free (HandleMap *map);
static HandleEntry * store_handle (HandleMap *map, dds_instance_handle_t key);
static HandleEntry * retrieve_handle (HandleMap *map, dds_instance_handle_t key);
static void data_available_handler (dds_entity_t reader, void *arg);
static int parse_args(int argc, char **argv, unsigned long long *maxCycles, char **partitionName);
static void process_samples(unsigned long long maxCycles);
static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName);
static void finalize_dds(dds_entity_t participant);
/* Functions to handle Ctrl-C presses. */
#ifdef _WIN32
#include <Windows.h>
static int CtrlHandler (DWORD fdwCtrlType)
{
dds_waitset_set_trigger (waitSet, true);
done = true;
return true; /* Don't let other handlers handle this key */
}
#else
struct sigaction oldAction;
static void CtrlHandler (int fdwCtrlType)
{
dds_waitset_set_trigger (waitSet, true);
done = true;
}
#endif
int main (int argc, char **argv)
{
unsigned long long maxCycles = 0;
char *partitionName = "Throughput example";
dds_entity_t participant;
dds_entity_t reader;
time_now = dds_time ();
prev_time = time_now;
/* Register handler for Ctrl-C */
#ifdef _WIN32
SetConsoleCtrlHandler((PHANDLER_ROUTINE) CtrlHandler, true);
#else
struct sigaction sat;
sat.sa_handler = CtrlHandler;
sigemptyset(&sat.sa_mask);
sat.sa_flags = 0;
sigaction (SIGINT, &sat, &oldAction);
#endif
if (parse_args(argc, argv, &maxCycles, &partitionName) == EXIT_FAILURE)
{
return EXIT_FAILURE;
}
printf ("Cycles: %llu | PollingDelay: %lu | Partition: %s\n",
maxCycles, pollingDelay, partitionName);
participant = prepare_dds(&reader, partitionName);
printf ("=== [Subscriber] Waiting for samples...\n");
/* Process samples until Ctrl-C is pressed or until maxCycles */
/* has been reached (0 = infinite) */
process_samples(maxCycles);
/* Finished, disable callbacks */
dds_set_enabled_status (reader, 0);
HandleMap__free (imap);
#ifdef _WIN32
SetConsoleCtrlHandler (0, FALSE);
#else
sigaction (SIGINT, &oldAction, 0);
#endif
/* Clean up */
finalize_dds(participant);
return EXIT_SUCCESS;
}
/*
* This struct contains all of the entities used in the publisher and subscriber.
*/
static HandleMap * HandleMap__alloc (void)
{
HandleMap * map = malloc (sizeof (*map));
assert(map);
memset (map, 0, sizeof (*map));
return map;
}
static void HandleMap__free (HandleMap *map)
{
HandleEntry * entry;
while (map->entries)
{
entry = map->entries;
map->entries = entry->next;
free (entry);
}
free (map);
}
static HandleEntry * store_handle (HandleMap *map, dds_instance_handle_t key)
{
HandleEntry * entry = malloc (sizeof (*entry));
assert(entry);
memset (entry, 0, sizeof (*entry));
entry->handle = key;
entry->next = map->entries;
map->entries = entry;
return entry;
}
static HandleEntry * retrieve_handle (HandleMap *map, dds_instance_handle_t key)
{
HandleEntry * entry = map->entries;
while (entry)
{
if (entry->handle == key)
{
break;
}
entry = entry->next;
}
return entry;
}
static void data_available_handler (dds_entity_t reader, void *arg)
{
int samples_received;
dds_sample_info_t info [MAX_SAMPLES];
dds_instance_handle_t ph = 0;
HandleEntry * current = NULL;
if (startTime == 0)
{
startTime = dds_time ();
}
/* Take samples and iterate through them */
samples_received = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES);
DDS_ERR_CHECK (samples_received, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
for (int i = 0; !done && i < samples_received; i++)
{
if (info[i].valid_data)
{
ph = info[i].publication_handle;
current = retrieve_handle (imap, ph);
ThroughputModule_DataType * this_sample = &data[i];
if (current == NULL)
{
current = store_handle (imap, ph);
current->count = this_sample->count;
}
if (this_sample->count != current->count)
{
outOfOrder++;
}
current->count = this_sample->count + 1;
/* Add the sample payload size to the total received */
payloadSize = this_sample->payload._length;
total_bytes += payloadSize + 8;
total_samples++;
}
}
time_now = dds_time ();
if ((pollingDelay == 0) && (time_now > (prev_time + DDS_SECS (1))))
{
dds_waitset_set_trigger (pollingWaitset, true);
}
}
static int parse_args(int argc, char **argv, unsigned long long *maxCycles, char **partitionName)
{
/*
* Get the program parameters
* Parameters: subscriber [maxCycles] [pollingDelay] [partitionName]
*/
if (argc == 2 && (strcmp (argv[1], "-h") == 0 || strcmp (argv[1], "--help") == 0))
{
printf ("Usage (parameters must be supplied in order):\n");
printf ("./subscriber [maxCycles (0 = infinite)] [pollingDelay (ms, 0 = event based)] [partitionName]\n");
printf ("Defaults:\n");
printf ("./subscriber 0 0 \"Throughput example\"\n");
return EXIT_FAILURE;
}
if (argc > 1)
{
*maxCycles = atoi (argv[1]); /* The number of times to output statistics before terminating */
}
if (argc > 2)
{
pollingDelay = atoi (argv[2]); /* The number of ms to wait between reads (0 = event based) */
}
if (argc > 3)
{
*partitionName = argv[3]; /* The name of the partition */
}
return EXIT_SUCCESS;
}
static void process_samples(unsigned long long maxCycles)
{
dds_return_t status;
unsigned long long prev_bytes = 0;
unsigned long long prev_samples = 0;
dds_attach_t wsresults[1];
size_t wsresultsize = 1U;
dds_time_t deltaTv;
bool first_batch = true;
unsigned long cycles = 0;
double deltaTime = 0;
while (!done && (maxCycles == 0 || cycles < maxCycles))
{
if (pollingDelay)
{
dds_sleepfor (DDS_MSECS (pollingDelay));
}
else
{
status = dds_waitset_wait (waitSet, wsresults, wsresultsize, DDS_INFINITY);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
if ((status > 0 ) && (dds_triggered (pollingWaitset)))
{
dds_waitset_set_trigger (pollingWaitset, false);
}
}
if (!first_batch)
{
deltaTv = time_now - prev_time;
deltaTime = (double) deltaTv / DDS_NSECS_IN_SEC;
prev_time = time_now;
printf
(
"=== [Subscriber] Payload size: %lu | Total received: %llu samples, %llu bytes | Out of order: %llu samples "
"Transfer rate: %.2lf samples/s, %.2lf Mbit/s\n",
payloadSize, total_samples, total_bytes, outOfOrder,
(deltaTime) ? ((total_samples - prev_samples) / deltaTime) : 0,
(deltaTime) ? (((total_bytes - prev_bytes) / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime) : 0
);
cycles++;
}
else
{
prev_time = time_now;
first_batch = false;
}
/* Update the previous values for next iteration */
prev_bytes = total_bytes;
prev_samples = total_samples;
}
/* Output totals and averages */
deltaTv = time_now - startTime;
deltaTime = (double) (deltaTv / DDS_NSECS_IN_SEC);
printf ("\nTotal received: %llu samples, %llu bytes\n", total_samples, total_bytes);
printf ("Out of order: %llu samples\n", outOfOrder);
printf ("Average transfer rate: %.2lf samples/s, ", total_samples / deltaTime);
printf ("%.2lf Mbit/s\n", (total_bytes / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime);
}
static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName)
{
dds_return_t status;
dds_entity_t topic;
dds_entity_t subscriber;
dds_listener_t *rd_listener;
dds_entity_t participant;
uint32_t maxSamples = 400;
const char *subParts[1];
dds_qos_t *subQos = dds_qos_create ();
dds_qos_t *drQos = dds_qos_create ();
/* A Participant is created for the default domain. */
participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
DDS_ERR_CHECK (participant, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
/* A Topic is created for our sample type on the domain participant. */
topic = dds_create_topic (participant, &ThroughputModule_DataType_desc, "Throughput", NULL, NULL);
DDS_ERR_CHECK (topic, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
/* A Subscriber is created on the domain participant. */
subParts[0] = partitionName;
dds_qset_partition (subQos, 1, subParts);
subscriber = dds_create_subscriber (participant, subQos, NULL);
DDS_ERR_CHECK (subscriber, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
dds_qos_delete (subQos);
/* A Reader is created on the Subscriber & Topic with a modified Qos. */
dds_qset_reliability (drQos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10));
dds_qset_history (drQos, DDS_HISTORY_KEEP_ALL, 0);
dds_qset_resource_limits (drQos, maxSamples, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED);
rd_listener = dds_listener_create(NULL);
dds_lset_data_available(rd_listener, data_available_handler);
/* A Read Condition is created which is triggered when data is available to read */
waitSet = dds_create_waitset (participant);
DDS_ERR_CHECK (waitSet, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
pollingWaitset = dds_create_waitset (participant);
DDS_ERR_CHECK (pollingWaitset, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
status = dds_waitset_attach (waitSet, pollingWaitset, pollingWaitset);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
status = dds_waitset_attach (waitSet, waitSet, waitSet);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
imap = HandleMap__alloc ();
memset (data, 0, sizeof (data));
for (unsigned int i = 0; i < MAX_SAMPLES; i++)
{
samples[i] = &data[i];
}
*reader = dds_create_reader (subscriber, topic, drQos, rd_listener);
DDS_ERR_CHECK (*reader, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
dds_qos_delete (drQos);
dds_listener_delete(rd_listener);
return participant;
}
static void finalize_dds(dds_entity_t participant)
{
dds_return_t status;
for (unsigned int i = 0; i < MAX_SAMPLES; i++)
{
ThroughputModule_DataType_free (&data[i], DDS_FREE_CONTENTS);
}
status = dds_waitset_detach (waitSet, waitSet);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
status = dds_waitset_detach (waitSet, pollingWaitset);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
status = dds_delete (pollingWaitset);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
status = dds_delete (waitSet);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
status = dds_delete (participant);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
}