Reorganize repository
* Move the project top-level CMakeLists.txt to the root of the project; this allows building Cyclone as part of ROS2 without any special tricks; * Clean up the build options: ENABLE_SSL: whether to check for and include OpenSSL support if a library can be found (default = ON); this used to be called DDSC_ENABLE_OPENSSL, the old name is deprecated but still works BUILD_DOCS: whether to build docs (default = OFF) BUILD_TESTING: whether to build test (default = OFF) * Collect all documentation into top-level "docs" directory; * Move the examples to the top-level directory; * Remove the unused and somewhat misleading pseudo-default cyclonedds.xml; * Remove unused cmake files Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
4e80559763
commit
9cf4b97f1a
102 changed files with 627 additions and 1925 deletions
33
examples/throughput/CMakeLists.txt
Normal file
33
examples/throughput/CMakeLists.txt
Normal file
|
@ -0,0 +1,33 @@
|
|||
#
|
||||
# Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
|
||||
#
|
||||
# This program and the accompanying materials are made available under the
|
||||
# terms of the Eclipse Public License v. 2.0 which is available at
|
||||
# http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
|
||||
# v. 1.0 which is available at
|
||||
# http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
#
|
||||
# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
|
||||
#
|
||||
cmake_minimum_required(VERSION 3.5)
|
||||
|
||||
if (NOT TARGET CycloneDDS::ddsc)
|
||||
# Find the CycloneDDS package. If it is not in a default location, try
|
||||
# finding it relative to the example where it most likely resides.
|
||||
find_package(CycloneDDS REQUIRED PATHS "${CMAKE_SOURCE_DIR}/../../")
|
||||
endif()
|
||||
|
||||
# This is a convenience function, provided by the CycloneDDS package,
|
||||
# that will supply a library target related the the given idl file.
|
||||
# In short, it takes the idl file, generates the source files with
|
||||
# the proper data types and compiles them into a library.
|
||||
idlc_generate(Throughput_lib Throughput.idl)
|
||||
|
||||
# Both executables have only one related source file.
|
||||
add_executable(ThroughputPublisher publisher.c)
|
||||
add_executable(ThroughputSubscriber subscriber.c)
|
||||
|
||||
# Both executables need to be linked to the idl data type library and
|
||||
# the ddsc API library.
|
||||
target_link_libraries(ThroughputPublisher Throughput_lib CycloneDDS::ddsc)
|
||||
target_link_libraries(ThroughputSubscriber Throughput_lib CycloneDDS::ddsc)
|
9
examples/throughput/Throughput.idl
Normal file
9
examples/throughput/Throughput.idl
Normal file
|
@ -0,0 +1,9 @@
|
|||
module ThroughputModule
|
||||
{
|
||||
struct DataType
|
||||
{
|
||||
unsigned long long count;
|
||||
sequence<octet> payload;
|
||||
};
|
||||
#pragma keylist DataType
|
||||
};
|
295
examples/throughput/publisher.c
Normal file
295
examples/throughput/publisher.c
Normal file
|
@ -0,0 +1,295 @@
|
|||
#include "dds/dds.h"
|
||||
#include "Throughput.h"
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
|
||||
/*
|
||||
* The Throughput example measures data throughput in bytes per second. The publisher
|
||||
* allows you to specify a payload size in bytes as well as allowing you to specify
|
||||
* whether to send data in bursts. The publisher will continue to send data forever
|
||||
* unless a time out is specified. The subscriber will receive data and output the
|
||||
* total amount received and the data rate in bytes per second. It will also indicate
|
||||
* if any samples were received out of order. A maximum number of cycles can be
|
||||
* specified and once this has been reached the subscriber will terminate and output
|
||||
* totals and averages.
|
||||
*/
|
||||
|
||||
#define MAX_SAMPLES 100
|
||||
|
||||
static bool done = false;
|
||||
|
||||
/* Forward declarations */
|
||||
static dds_return_t wait_for_reader(dds_entity_t writer, dds_entity_t participant);
|
||||
static void start_writing(dds_entity_t writer, ThroughputModule_DataType *sample,
|
||||
int burstInterval, uint32_t burstSize, int timeOut);
|
||||
static int parse_args(int argc, char **argv, uint32_t *payloadSize, int *burstInterval,
|
||||
uint32_t *burstSize, int *timeOut, char **partitionName);
|
||||
static dds_entity_t prepare_dds(dds_entity_t *writer, const char *partitionName);
|
||||
static void finalize_dds(dds_entity_t participant, dds_entity_t writer, ThroughputModule_DataType sample);
|
||||
|
||||
static void sigint (int sig)
|
||||
{
|
||||
(void)sig;
|
||||
done = true;
|
||||
}
|
||||
|
||||
int main (int argc, char **argv)
|
||||
{
|
||||
uint32_t payloadSize = 8192;
|
||||
int burstInterval = 0;
|
||||
uint32_t burstSize = 1;
|
||||
int timeOut = 0;
|
||||
char * partitionName = "Throughput example";
|
||||
dds_entity_t participant;
|
||||
dds_entity_t writer;
|
||||
dds_return_t rc;
|
||||
ThroughputModule_DataType sample;
|
||||
|
||||
if (parse_args(argc, argv, &payloadSize, &burstInterval, &burstSize, &timeOut, &partitionName) == EXIT_FAILURE) {
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
participant = prepare_dds(&writer, partitionName);
|
||||
|
||||
/* Wait until have a reader */
|
||||
if (wait_for_reader(writer, participant) == 0) {
|
||||
printf ("=== [Publisher] Did not discover a reader.\n");
|
||||
fflush (stdout);
|
||||
rc = dds_delete (participant);
|
||||
if (rc < 0)
|
||||
DDS_FATAL("dds_delete: %s\n", dds_strretcode(-rc));
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
/* Fill the sample payload with data */
|
||||
sample.count = 0;
|
||||
sample.payload._buffer = dds_alloc (payloadSize);
|
||||
sample.payload._length = payloadSize;
|
||||
sample.payload._release = true;
|
||||
for (uint32_t i = 0; i < payloadSize; i++) {
|
||||
sample.payload._buffer[i] = 'a';
|
||||
}
|
||||
|
||||
/* Register handler for Ctrl-C */
|
||||
signal (SIGINT, sigint);
|
||||
|
||||
/* Register the sample instance and write samples repeatedly or until time out */
|
||||
start_writing(writer, &sample, burstInterval, burstSize, timeOut);
|
||||
|
||||
/* Cleanup */
|
||||
finalize_dds(participant, writer, sample);
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
|
||||
static int parse_args(
|
||||
int argc,
|
||||
char **argv,
|
||||
uint32_t *payloadSize,
|
||||
int *burstInterval,
|
||||
uint32_t *burstSize,
|
||||
int *timeOut,
|
||||
char **partitionName)
|
||||
{
|
||||
int result = EXIT_SUCCESS;
|
||||
/*
|
||||
* Get the program parameters
|
||||
* Parameters: publisher [payloadSize] [burstInterval] [burstSize] [timeOut] [partitionName]
|
||||
*/
|
||||
if (argc == 2 && (strcmp (argv[1], "-h") == 0 || strcmp (argv[1], "--help") == 0))
|
||||
{
|
||||
printf ("Usage (parameters must be supplied in order):\n");
|
||||
printf ("./publisher [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]\n");
|
||||
printf ("Defaults:\n");
|
||||
printf ("./publisher 8192 0 1 0 \"Throughput example\"\n");
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
if (argc > 1)
|
||||
{
|
||||
*payloadSize = (uint32_t) atoi (argv[1]); /* The size of the payload in bytes */
|
||||
}
|
||||
if (argc > 2)
|
||||
{
|
||||
*burstInterval = atoi (argv[2]); /* The time interval between each burst in ms */
|
||||
}
|
||||
if (argc > 3)
|
||||
{
|
||||
*burstSize = (uint32_t) atoi (argv[3]); /* The number of samples to send each burst */
|
||||
}
|
||||
if (argc > 4)
|
||||
{
|
||||
*timeOut = atoi (argv[4]); /* The number of seconds the publisher should run for (0 = infinite) */
|
||||
}
|
||||
if (argc > 5)
|
||||
{
|
||||
*partitionName = argv[5]; /* The name of the partition */
|
||||
}
|
||||
|
||||
printf ("payloadSize: %"PRIu32" bytes burstInterval: %u ms burstSize: %"PRId32" timeOut: %u seconds partitionName: %s\n",
|
||||
*payloadSize, *burstInterval, *burstSize, *timeOut, *partitionName);
|
||||
fflush (stdout);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
static dds_entity_t prepare_dds(dds_entity_t *writer, const char *partitionName)
|
||||
{
|
||||
dds_entity_t participant;
|
||||
dds_entity_t topic;
|
||||
dds_entity_t publisher;
|
||||
const char *pubParts[1];
|
||||
dds_qos_t *pubQos;
|
||||
dds_qos_t *dwQos;
|
||||
|
||||
/* A domain participant is created for the default domain. */
|
||||
participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
|
||||
if (participant < 0)
|
||||
DDS_FATAL("dds_create_participant: %s\n", dds_strretcode(-participant));
|
||||
|
||||
/* A topic is created for our sample type on the domain participant. */
|
||||
topic = dds_create_topic (participant, &ThroughputModule_DataType_desc, "Throughput", NULL, NULL);
|
||||
if (topic < 0)
|
||||
DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
|
||||
|
||||
/* A publisher is created on the domain participant. */
|
||||
pubQos = dds_create_qos ();
|
||||
pubParts[0] = partitionName;
|
||||
dds_qset_partition (pubQos, 1, pubParts);
|
||||
publisher = dds_create_publisher (participant, pubQos, NULL);
|
||||
if (publisher < 0)
|
||||
DDS_FATAL("dds_create_publisher: %s\n", dds_strretcode(-publisher));
|
||||
dds_delete_qos (pubQos);
|
||||
|
||||
/* A DataWriter is created on the publisher. */
|
||||
dwQos = dds_create_qos ();
|
||||
dds_qset_reliability (dwQos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10));
|
||||
dds_qset_history (dwQos, DDS_HISTORY_KEEP_ALL, 0);
|
||||
dds_qset_resource_limits (dwQos, MAX_SAMPLES, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED);
|
||||
*writer = dds_create_writer (publisher, topic, dwQos, NULL);
|
||||
if (*writer < 0)
|
||||
DDS_FATAL("dds_create_writer: %s\n", dds_strretcode(-*writer));
|
||||
dds_delete_qos (dwQos);
|
||||
|
||||
/* Enable write batching */
|
||||
dds_write_set_batch (true);
|
||||
|
||||
return participant;
|
||||
}
|
||||
|
||||
static dds_return_t wait_for_reader(dds_entity_t writer, dds_entity_t participant)
|
||||
{
|
||||
printf ("\n=== [Publisher] Waiting for a reader ...\n");
|
||||
fflush (stdout);
|
||||
|
||||
dds_return_t rc;
|
||||
dds_entity_t waitset;
|
||||
|
||||
rc = dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS);
|
||||
if (rc < 0)
|
||||
DDS_FATAL("dds_set_status_mask: %s\n", dds_strretcode(-rc));
|
||||
|
||||
waitset = dds_create_waitset(participant);
|
||||
if (waitset < 0)
|
||||
DDS_FATAL("dds_create_waitset: %s\n", dds_strretcode(-waitset));
|
||||
|
||||
rc = dds_waitset_attach(waitset, writer, (dds_attach_t)NULL);
|
||||
if (rc < 0)
|
||||
DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-rc));
|
||||
|
||||
rc = dds_waitset_wait(waitset, NULL, 0, DDS_SECS(30));
|
||||
if (rc < 0)
|
||||
DDS_FATAL("dds_waitset_wait: %s\n", dds_strretcode(-rc));
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static void start_writing(
|
||||
dds_entity_t writer,
|
||||
ThroughputModule_DataType *sample,
|
||||
int burstInterval,
|
||||
uint32_t burstSize,
|
||||
int timeOut)
|
||||
{
|
||||
bool timedOut = false;
|
||||
dds_time_t pubStart = dds_time ();
|
||||
dds_time_t now;
|
||||
dds_time_t deltaTv;
|
||||
dds_return_t status;
|
||||
|
||||
if (!done)
|
||||
{
|
||||
dds_time_t burstStart = pubStart;
|
||||
unsigned int burstCount = 0;
|
||||
|
||||
printf ("=== [Publisher] Writing samples...\n");
|
||||
fflush (stdout);
|
||||
|
||||
while (!done && !timedOut)
|
||||
{
|
||||
/* Write data until burst size has been reached */
|
||||
|
||||
if (burstCount < burstSize)
|
||||
{
|
||||
status = dds_write (writer, sample);
|
||||
if (status == DDS_RETCODE_TIMEOUT)
|
||||
{
|
||||
timedOut = true;
|
||||
}
|
||||
else if (status < 0)
|
||||
{
|
||||
DDS_FATAL("dds_write: %s\n", dds_strretcode(-status));
|
||||
}
|
||||
else
|
||||
{
|
||||
sample->count++;
|
||||
burstCount++;
|
||||
}
|
||||
}
|
||||
else if (burstInterval)
|
||||
{
|
||||
/* Sleep until burst interval has passed */
|
||||
|
||||
dds_time_t time = dds_time ();
|
||||
deltaTv = time - burstStart;
|
||||
if (deltaTv < DDS_MSECS (burstInterval))
|
||||
{
|
||||
dds_write_flush (writer);
|
||||
dds_sleepfor (DDS_MSECS (burstInterval) - deltaTv);
|
||||
}
|
||||
burstStart = dds_time ();
|
||||
burstCount = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
burstCount = 0;
|
||||
}
|
||||
|
||||
if (timeOut)
|
||||
{
|
||||
now = dds_time ();
|
||||
deltaTv = now - pubStart;
|
||||
if ((deltaTv) > DDS_SECS (timeOut))
|
||||
{
|
||||
timedOut = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
dds_write_flush (writer);
|
||||
|
||||
printf ("=== [Publisher] %s, %llu samples written.\n", done ? "Terminated" : "Timed out", (unsigned long long) sample->count);
|
||||
fflush (stdout);
|
||||
}
|
||||
}
|
||||
|
||||
static void finalize_dds(dds_entity_t participant, dds_entity_t writer, ThroughputModule_DataType sample)
|
||||
{
|
||||
dds_return_t status = dds_dispose (writer, &sample);
|
||||
if (status != DDS_RETCODE_TIMEOUT && status < 0)
|
||||
DDS_FATAL("dds_dispose: %s\n", dds_strretcode(-status));
|
||||
|
||||
dds_free (sample.payload._buffer);
|
||||
status = dds_delete (participant);
|
||||
if (status < 0)
|
||||
DDS_FATAL("dds_delete: %s\n", dds_strretcode(-status));
|
||||
}
|
99
examples/throughput/readme.rst
Normal file
99
examples/throughput/readme.rst
Normal file
|
@ -0,0 +1,99 @@
|
|||
..
|
||||
Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
|
||||
|
||||
This program and the accompanying materials are made available under the
|
||||
terms of the Eclipse Public License v. 2.0 which is available at
|
||||
http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
|
||||
v. 1.0 which is available at
|
||||
http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
|
||||
SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
|
||||
|
||||
Throughput
|
||||
==========
|
||||
|
||||
Description
|
||||
***********
|
||||
|
||||
The Throughput example allows the measurement of data throughput when receiving samples from a publisher.
|
||||
|
||||
|
||||
Design
|
||||
******
|
||||
|
||||
It consists of 2 units:
|
||||
|
||||
- Publisher: sends samples at a specified size and rate.
|
||||
- Subscriber: Receives samples and outputs statistics about throughput
|
||||
|
||||
Scenario
|
||||
********
|
||||
|
||||
The **publisher** sends samples and allows you to specify a payload size in bytes as well as allowing you to specify whether
|
||||
to send data in bursts. The **publisher** will continue to send data forever unless a time-out is specified.
|
||||
|
||||
Configurable:
|
||||
|
||||
- payloadSize: the size of the payload in bytes
|
||||
- burstInterval: the time interval between each burst in ms
|
||||
- burstSize: the number of samples to send each burst
|
||||
- timeOut: the number of seconds the publisher should run for (0=infinite)
|
||||
- partitionName: the name of the partition
|
||||
|
||||
The **subscriber** will receive data and output the total amount received and the data-rate in bytes-per-second. It will
|
||||
also indicate if any samples were received out-of-order. A maximum number of cycles can be specified and once this has
|
||||
been reached the subscriber will terminate and output totals and averages.
|
||||
|
||||
The **subscriber** executable measures:
|
||||
|
||||
- transferred: the total amount of data transferred in bytes.
|
||||
- outOfOrder: the number of samples that were received out of order.
|
||||
- transfer rate: the data transfer rate in bytes per second.
|
||||
- subscriber also calculates statistics on these values over a configurable number of cycles.
|
||||
|
||||
Configurable:
|
||||
|
||||
- maxCycles: the number of times to output statistics before terminating
|
||||
- pollingDelay
|
||||
- partitionName: the name of the partition
|
||||
|
||||
|
||||
Running the example
|
||||
*******************
|
||||
|
||||
It is recommended that you run ping and pong in separate terminals to avoid mixing the output.
|
||||
|
||||
- Open 2 terminals.
|
||||
- In the first terminal start Publisher by running publisher
|
||||
|
||||
publisher usage (parameters must be supplied in order):
|
||||
``./publisher [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]``
|
||||
defaults:
|
||||
``./publisher 8192 0 1 0 "Throughput example"``
|
||||
|
||||
- In the second terminal start Ping by running subscriber
|
||||
|
||||
subscriber usage (parameters must be supplied in order):
|
||||
``./subscriber [maxCycles (0=infinite)] [pollingDelay (ms, 0 = event based)] [partitionName]``
|
||||
defaults:
|
||||
``./subscriber 0 0 "Throughput example"``
|
||||
|
||||
- To achieve optimal performance it is recommended to set the CPU affinity so that ping and pong run on separate CPU cores,
|
||||
and use real-time scheduling. In a Linux environment this can be achieved as follows:
|
||||
|
||||
publisher usage:
|
||||
``taskset -c 0 chrt -f 80 ./publisher [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]``
|
||||
subscriber usage:
|
||||
``taskset -c 1 chrt -f 80 ./subscriber [maxCycles (0 = infinite)] [pollingDelay (ms, 0 = event based)] [partitionName]``
|
||||
|
||||
On Windows the CPU affinity and prioritized scheduling class can be set as follows:
|
||||
|
||||
publisher usage:
|
||||
``START /affinity 1 /high cmd /k "publisher.exe" [payloadSize (bytes)] [burstInterval (ms)] [burstSize (samples)] [timeOut (seconds)] [partitionName]``
|
||||
subscriber usage:
|
||||
``START /affinity 2 /high cmd /k "subscriber.exe" [maxCycles (0 = infinite)] [pollingDelay (ms, 0 = event based)] [partitionName]``
|
||||
|
||||
|
||||
|
||||
|
||||
|
400
examples/throughput/subscriber.c
Normal file
400
examples/throughput/subscriber.c
Normal file
|
@ -0,0 +1,400 @@
|
|||
#include "dds/dds.h"
|
||||
#include "Throughput.h"
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <signal.h>
|
||||
#include <assert.h>
|
||||
/*
|
||||
* The Throughput example measures data throughput in bytes per second. The publisher
|
||||
* allows you to specify a payload size in bytes as well as allowing you to specify
|
||||
* whether to send data in bursts. The publisher will continue to send data forever
|
||||
* unless a time out is specified. The subscriber will receive data and output the
|
||||
* total amount received and the data rate in bytes per second. It will also indicate
|
||||
* if any samples were received out of order. A maximum number of cycles can be
|
||||
* specified and once this has been reached the subscriber will terminate and output
|
||||
* totals and averages.
|
||||
*/
|
||||
|
||||
#define BYTES_PER_SEC_TO_MEGABITS_PER_SEC 125000
|
||||
#define MAX_SAMPLES 1000
|
||||
|
||||
typedef struct HandleEntry
|
||||
{
|
||||
dds_instance_handle_t handle;
|
||||
unsigned long long count;
|
||||
struct HandleEntry * next;
|
||||
} HandleEntry;
|
||||
|
||||
typedef struct HandleMap
|
||||
{
|
||||
HandleEntry *entries;
|
||||
} HandleMap;
|
||||
|
||||
static long pollingDelay = -1; /* i.e. use a listener */
|
||||
|
||||
static HandleMap * imap;
|
||||
static unsigned long long outOfOrder = 0;
|
||||
static unsigned long long total_bytes = 0;
|
||||
static unsigned long long total_samples = 0;
|
||||
|
||||
static dds_time_t startTime = 0;
|
||||
|
||||
static unsigned long payloadSize = 0;
|
||||
|
||||
static ThroughputModule_DataType data [MAX_SAMPLES];
|
||||
static void * samples[MAX_SAMPLES];
|
||||
|
||||
static dds_entity_t waitSet;
|
||||
|
||||
static volatile sig_atomic_t done = false;
|
||||
|
||||
/* Forward declarations */
|
||||
static HandleMap * HandleMap__alloc (void);
|
||||
static void HandleMap__free (HandleMap *map);
|
||||
static HandleEntry * store_handle (HandleMap *map, dds_instance_handle_t key);
|
||||
static HandleEntry * retrieve_handle (HandleMap *map, dds_instance_handle_t key);
|
||||
|
||||
static void data_available_handler (dds_entity_t reader, void *arg);
|
||||
static int parse_args(int argc, char **argv, unsigned long long *maxCycles, char **partitionName);
|
||||
static void process_samples(dds_entity_t reader, unsigned long long maxCycles);
|
||||
static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName);
|
||||
static void finalize_dds(dds_entity_t participant);
|
||||
|
||||
static void sigint (int sig)
|
||||
{
|
||||
(void) sig;
|
||||
done = true;
|
||||
}
|
||||
|
||||
int main (int argc, char **argv)
|
||||
{
|
||||
unsigned long long maxCycles = 0;
|
||||
char *partitionName = "Throughput example";
|
||||
|
||||
dds_entity_t participant;
|
||||
dds_entity_t reader;
|
||||
|
||||
if (parse_args(argc, argv, &maxCycles, &partitionName) == EXIT_FAILURE)
|
||||
{
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
printf ("Cycles: %llu | PollingDelay: %ld | Partition: %s\n", maxCycles, pollingDelay, partitionName);
|
||||
fflush (stdout);
|
||||
|
||||
participant = prepare_dds(&reader, partitionName);
|
||||
|
||||
printf ("=== [Subscriber] Waiting for samples...\n");
|
||||
fflush (stdout);
|
||||
|
||||
/* Process samples until Ctrl-C is pressed or until maxCycles */
|
||||
/* has been reached (0 = infinite) */
|
||||
signal (SIGINT, sigint);
|
||||
process_samples(reader, maxCycles);
|
||||
|
||||
dds_set_status_mask (reader, 0);
|
||||
HandleMap__free (imap);
|
||||
finalize_dds (participant);
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* This struct contains all of the entities used in the publisher and subscriber.
|
||||
*/
|
||||
static HandleMap * HandleMap__alloc (void)
|
||||
{
|
||||
HandleMap * map = malloc (sizeof (*map));
|
||||
assert(map);
|
||||
memset (map, 0, sizeof (*map));
|
||||
return map;
|
||||
}
|
||||
|
||||
static void HandleMap__free (HandleMap *map)
|
||||
{
|
||||
HandleEntry * entry;
|
||||
|
||||
while (map->entries)
|
||||
{
|
||||
entry = map->entries;
|
||||
map->entries = entry->next;
|
||||
free (entry);
|
||||
}
|
||||
free (map);
|
||||
}
|
||||
|
||||
static HandleEntry * store_handle (HandleMap *map, dds_instance_handle_t key)
|
||||
{
|
||||
HandleEntry * entry = malloc (sizeof (*entry));
|
||||
assert(entry);
|
||||
memset (entry, 0, sizeof (*entry));
|
||||
|
||||
entry->handle = key;
|
||||
entry->next = map->entries;
|
||||
map->entries = entry;
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
static HandleEntry * retrieve_handle (HandleMap *map, dds_instance_handle_t key)
|
||||
{
|
||||
HandleEntry * entry = map->entries;
|
||||
|
||||
while (entry)
|
||||
{
|
||||
if (entry->handle == key)
|
||||
{
|
||||
break;
|
||||
}
|
||||
entry = entry->next;
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
|
||||
static int do_take (dds_entity_t reader)
|
||||
{
|
||||
int samples_received;
|
||||
dds_sample_info_t info [MAX_SAMPLES];
|
||||
dds_instance_handle_t ph = 0;
|
||||
HandleEntry * current = NULL;
|
||||
|
||||
if (startTime == 0)
|
||||
{
|
||||
startTime = dds_time ();
|
||||
}
|
||||
|
||||
/* Take samples and iterate through them */
|
||||
|
||||
samples_received = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES);
|
||||
if (samples_received < 0)
|
||||
DDS_FATAL("dds_take: %s\n", dds_strretcode(-samples_received));
|
||||
|
||||
for (int i = 0; !done && i < samples_received; i++)
|
||||
{
|
||||
if (info[i].valid_data)
|
||||
{
|
||||
ph = info[i].publication_handle;
|
||||
current = retrieve_handle (imap, ph);
|
||||
ThroughputModule_DataType * this_sample = &data[i];
|
||||
|
||||
if (current == NULL)
|
||||
{
|
||||
current = store_handle (imap, ph);
|
||||
current->count = this_sample->count;
|
||||
}
|
||||
|
||||
if (this_sample->count != current->count)
|
||||
{
|
||||
outOfOrder++;
|
||||
}
|
||||
current->count = this_sample->count + 1;
|
||||
|
||||
/* Add the sample payload size to the total received */
|
||||
|
||||
payloadSize = this_sample->payload._length;
|
||||
total_bytes += payloadSize + 8;
|
||||
total_samples++;
|
||||
}
|
||||
}
|
||||
return samples_received;
|
||||
}
|
||||
|
||||
static void data_available_handler (dds_entity_t reader, void *arg)
|
||||
{
|
||||
(void)arg;
|
||||
do_take (reader);
|
||||
}
|
||||
|
||||
static int parse_args(int argc, char **argv, unsigned long long *maxCycles, char **partitionName)
|
||||
{
|
||||
/*
|
||||
* Get the program parameters
|
||||
* Parameters: subscriber [maxCycles] [pollingDelay] [partitionName]
|
||||
*/
|
||||
if (argc == 2 && (strcmp (argv[1], "-h") == 0 || strcmp (argv[1], "--help") == 0))
|
||||
{
|
||||
printf ("Usage (parameters must be supplied in order):\n");
|
||||
printf ("./subscriber [maxCycles (0 = infinite)] [pollingDelay (ms, 0 = waitset, -1 = listener)] [partitionName]\n");
|
||||
printf ("Defaults:\n");
|
||||
printf ("./subscriber 0 0 \"Throughput example\"\n");
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
if (argc > 1)
|
||||
{
|
||||
*maxCycles = (unsigned long long) atoi (argv[1]); /* The number of times to output statistics before terminating */
|
||||
}
|
||||
if (argc > 2)
|
||||
{
|
||||
pollingDelay = atoi (argv[2]); /* The number of ms to wait between reads (0 = waitset, -1 = listener) */
|
||||
}
|
||||
if (argc > 3)
|
||||
{
|
||||
*partitionName = argv[3]; /* The name of the partition */
|
||||
}
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
|
||||
static void process_samples(dds_entity_t reader, unsigned long long maxCycles)
|
||||
{
|
||||
dds_return_t status;
|
||||
unsigned long long prev_bytes = 0;
|
||||
unsigned long long prev_samples = 0;
|
||||
dds_attach_t wsresults[2];
|
||||
dds_time_t deltaTv;
|
||||
bool first_batch = true;
|
||||
unsigned long cycles = 0;
|
||||
double deltaTime = 0;
|
||||
dds_time_t prev_time = 0;
|
||||
dds_time_t time_now = 0;
|
||||
|
||||
while (!done && (maxCycles == 0 || cycles < maxCycles))
|
||||
{
|
||||
if (pollingDelay > 0)
|
||||
dds_sleepfor (DDS_MSECS (pollingDelay));
|
||||
else
|
||||
{
|
||||
status = dds_waitset_wait (waitSet, wsresults, sizeof(wsresults)/sizeof(wsresults[0]), DDS_MSECS(100));
|
||||
if (status < 0)
|
||||
DDS_FATAL("dds_waitset_wait: %s\n", dds_strretcode(-status));
|
||||
}
|
||||
|
||||
if (pollingDelay >= 0)
|
||||
{
|
||||
while (do_take (reader))
|
||||
;
|
||||
}
|
||||
|
||||
time_now = dds_time();
|
||||
if (!first_batch)
|
||||
{
|
||||
deltaTv = time_now - prev_time;
|
||||
deltaTime = (double) deltaTv / DDS_NSECS_IN_SEC;
|
||||
|
||||
if (deltaTime >= 1.0 && total_samples != prev_samples)
|
||||
{
|
||||
printf ("=== [Subscriber] %5.3f Payload size: %lu | Total received: %llu samples, %llu bytes | Out of order: %llu samples "
|
||||
"Transfer rate: %.2lf samples/s, %.2lf Mbit/s\n",
|
||||
deltaTime, payloadSize, total_samples, total_bytes, outOfOrder,
|
||||
(deltaTime != 0.0) ? ((double)(total_samples - prev_samples) / deltaTime) : 0,
|
||||
(deltaTime != 0.0) ? ((double)((total_bytes - prev_bytes) / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime) : 0);
|
||||
fflush (stdout);
|
||||
cycles++;
|
||||
prev_time = time_now;
|
||||
prev_bytes = total_bytes;
|
||||
prev_samples = total_samples;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
prev_time = time_now;
|
||||
first_batch = false;
|
||||
}
|
||||
}
|
||||
|
||||
/* Output totals and averages */
|
||||
deltaTv = time_now - startTime;
|
||||
deltaTime = (double) (deltaTv / DDS_NSECS_IN_SEC);
|
||||
printf ("\nTotal received: %llu samples, %llu bytes\n", total_samples, total_bytes);
|
||||
printf ("Out of order: %llu samples\n", outOfOrder);
|
||||
printf ("Average transfer rate: %.2lf samples/s, ", (double)total_samples / deltaTime);
|
||||
printf ("%.2lf Mbit/s\n", (double)(total_bytes / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime);
|
||||
fflush (stdout);
|
||||
}
|
||||
|
||||
static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName)
|
||||
{
|
||||
dds_return_t status;
|
||||
dds_entity_t topic;
|
||||
dds_entity_t subscriber;
|
||||
dds_listener_t *rd_listener;
|
||||
dds_entity_t participant;
|
||||
|
||||
int32_t maxSamples = 4000;
|
||||
const char *subParts[1];
|
||||
dds_qos_t *subQos = dds_create_qos ();
|
||||
dds_qos_t *drQos = dds_create_qos ();
|
||||
|
||||
/* A Participant is created for the default domain. */
|
||||
|
||||
participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
|
||||
if (participant < 0)
|
||||
DDS_FATAL("dds_create_particpant: %s\n", dds_strretcode(-participant));
|
||||
|
||||
/* A Topic is created for our sample type on the domain participant. */
|
||||
|
||||
topic = dds_create_topic (participant, &ThroughputModule_DataType_desc, "Throughput", NULL, NULL);
|
||||
if (topic < 0)
|
||||
DDS_FATAL("dds_create_topic: %s\n", dds_strretcode(-topic));
|
||||
|
||||
/* A Subscriber is created on the domain participant. */
|
||||
|
||||
subParts[0] = partitionName;
|
||||
dds_qset_partition (subQos, 1, subParts);
|
||||
subscriber = dds_create_subscriber (participant, subQos, NULL);
|
||||
if (subscriber < 0)
|
||||
DDS_FATAL("dds_create_subscriber: %s\n", dds_strretcode(-subscriber));
|
||||
dds_delete_qos (subQos);
|
||||
|
||||
/* A Reader is created on the Subscriber & Topic with a modified Qos. */
|
||||
|
||||
dds_qset_reliability (drQos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10));
|
||||
dds_qset_history (drQos, DDS_HISTORY_KEEP_ALL, 0);
|
||||
dds_qset_resource_limits (drQos, maxSamples, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED);
|
||||
|
||||
rd_listener = dds_create_listener(NULL);
|
||||
dds_lset_data_available(rd_listener, data_available_handler);
|
||||
|
||||
/* A Read Condition is created which is triggered when data is available to read */
|
||||
waitSet = dds_create_waitset (participant);
|
||||
if (waitSet < 0)
|
||||
DDS_FATAL("dds_create_waitset: %s\n", dds_strretcode(-waitSet));
|
||||
|
||||
status = dds_waitset_attach (waitSet, waitSet, waitSet);
|
||||
if (status < 0)
|
||||
DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-status));
|
||||
|
||||
imap = HandleMap__alloc ();
|
||||
|
||||
memset (data, 0, sizeof (data));
|
||||
for (unsigned int i = 0; i < MAX_SAMPLES; i++)
|
||||
{
|
||||
samples[i] = &data[i];
|
||||
}
|
||||
|
||||
*reader = dds_create_reader (subscriber, topic, drQos, pollingDelay < 0 ? rd_listener : NULL);
|
||||
if (*reader < 0)
|
||||
DDS_FATAL("dds_create_reader: %s\n", dds_strretcode(-*reader));
|
||||
|
||||
if (pollingDelay == 0)
|
||||
{
|
||||
status = dds_waitset_attach (waitSet, *reader, *reader);
|
||||
if (status < 0)
|
||||
DDS_FATAL("dds_waitset_attach: %s\n", dds_strretcode(-status));
|
||||
}
|
||||
|
||||
dds_delete_qos (drQos);
|
||||
dds_delete_listener(rd_listener);
|
||||
|
||||
return participant;
|
||||
}
|
||||
|
||||
static void finalize_dds(dds_entity_t participant)
|
||||
{
|
||||
dds_return_t status;
|
||||
|
||||
for (unsigned int i = 0; i < MAX_SAMPLES; i++)
|
||||
{
|
||||
ThroughputModule_DataType_free (&data[i], DDS_FREE_CONTENTS);
|
||||
}
|
||||
|
||||
status = dds_waitset_detach (waitSet, waitSet);
|
||||
if (status < 0)
|
||||
DDS_FATAL("dds_waitset_detach: %s\n", dds_strretcode(-status));
|
||||
status = dds_delete (waitSet);
|
||||
if (status < 0)
|
||||
DDS_FATAL("dds_delete: %s\n", dds_strretcode(-status));
|
||||
status = dds_delete (participant);
|
||||
if (status < 0)
|
||||
DDS_FATAL("dds_delete: %s\n", dds_strretcode(-status));
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue