add listener-, waitset- and polling-mode to thorughput subscriber

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-01-15 11:11:43 +01:00
parent 54b5bed8d2
commit cd02110af0

View file

@ -17,7 +17,7 @@
*/
#define BYTES_PER_SEC_TO_MEGABITS_PER_SEC 125000
#define MAX_SAMPLES 100
#define MAX_SAMPLES 1000
typedef struct HandleEntry
{
@ -31,7 +31,7 @@ typedef struct HandleMap
HandleEntry *entries;
} HandleMap;
static long pollingDelay = 0;
static long pollingDelay = -1; /* i.e. use a listener */
static HandleMap * imap;
static unsigned long long outOfOrder = 0;
@ -39,8 +39,6 @@ static unsigned long long total_bytes = 0;
static unsigned long long total_samples = 0;
static dds_time_t startTime = 0;
static dds_time_t time_now = 0;
static dds_time_t prev_time = 0;
static unsigned long payloadSize = 0;
@ -48,9 +46,8 @@ static ThroughputModule_DataType data [MAX_SAMPLES];
static void * samples[MAX_SAMPLES];
static dds_entity_t waitSet;
static dds_entity_t pollingWaitset;
static bool done = false;
static volatile sig_atomic_t done = false;
/* Forward declarations */
static HandleMap * HandleMap__alloc (void);
@ -60,7 +57,7 @@ static HandleEntry * retrieve_handle (HandleMap *map, dds_instance_handle_t key)
static void data_available_handler (dds_entity_t reader, void *arg);
static int parse_args(int argc, char **argv, unsigned long long *maxCycles, char **partitionName);
static void process_samples(unsigned long long maxCycles);
static void process_samples(dds_entity_t reader, unsigned long long maxCycles);
static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName);
static void finalize_dds(dds_entity_t participant);
@ -91,9 +88,6 @@ int main (int argc, char **argv)
dds_entity_t participant;
dds_entity_t reader;
time_now = dds_time ();
prev_time = time_now;
/* Register handler for Ctrl-C */
#ifdef _WIN32
SetConsoleCtrlHandler((PHANDLER_ROUTINE) CtrlHandler, true);
@ -110,8 +104,7 @@ int main (int argc, char **argv)
return EXIT_FAILURE;
}
printf ("Cycles: %llu | PollingDelay: %lu | Partition: %s\n",
maxCycles, pollingDelay, partitionName);
printf ("Cycles: %llu | PollingDelay: %ld | Partition: %s\n", maxCycles, pollingDelay, partitionName);
participant = prepare_dds(&reader, partitionName);
@ -119,7 +112,7 @@ int main (int argc, char **argv)
/* Process samples until Ctrl-C is pressed or until maxCycles */
/* has been reached (0 = infinite) */
process_samples(maxCycles);
process_samples(reader, maxCycles);
/* Finished, disable callbacks */
dds_set_status_mask (reader, 0);
@ -189,13 +182,12 @@ static HandleEntry * retrieve_handle (HandleMap *map, dds_instance_handle_t key)
return entry;
}
static void data_available_handler (dds_entity_t reader, void *arg)
static int do_take (dds_entity_t reader)
{
int samples_received;
dds_sample_info_t info [MAX_SAMPLES];
dds_instance_handle_t ph = 0;
HandleEntry * current = NULL;
(void)arg;
if (startTime == 0)
{
@ -234,11 +226,13 @@ static void data_available_handler (dds_entity_t reader, void *arg)
total_samples++;
}
}
time_now = dds_time ();
if ((pollingDelay == 0) && (time_now > (prev_time + DDS_SECS (1))))
{
dds_waitset_set_trigger (pollingWaitset, true);
}
return samples_received;
}
static void data_available_handler (dds_entity_t reader, void *arg)
{
(void)arg;
do_take (reader);
}
static int parse_args(int argc, char **argv, unsigned long long *maxCycles, char **partitionName)
@ -250,7 +244,7 @@ static int parse_args(int argc, char **argv, unsigned long long *maxCycles, char
if (argc == 2 && (strcmp (argv[1], "-h") == 0 || strcmp (argv[1], "--help") == 0))
{
printf ("Usage (parameters must be supplied in order):\n");
printf ("./subscriber [maxCycles (0 = infinite)] [pollingDelay (ms, 0 = event based)] [partitionName]\n");
printf ("./subscriber [maxCycles (0 = infinite)] [pollingDelay (ms, 0 = waitset, -1 = listener)] [partitionName]\n");
printf ("Defaults:\n");
printf ("./subscriber 0 0 \"Throughput example\"\n");
return EXIT_FAILURE;
@ -262,7 +256,7 @@ static int parse_args(int argc, char **argv, unsigned long long *maxCycles, char
}
if (argc > 2)
{
pollingDelay = atoi (argv[2]); /* The number of ms to wait between reads (0 = event based) */
pollingDelay = atoi (argv[2]); /* The number of ms to wait between reads (0 = waitset, -1 = listener) */
}
if (argc > 3)
{
@ -271,61 +265,59 @@ static int parse_args(int argc, char **argv, unsigned long long *maxCycles, char
return EXIT_SUCCESS;
}
static void process_samples(unsigned long long maxCycles)
static void process_samples(dds_entity_t reader, unsigned long long maxCycles)
{
dds_return_t status;
unsigned long long prev_bytes = 0;
unsigned long long prev_samples = 0;
dds_attach_t wsresults[1];
size_t wsresultsize = 1U;
dds_attach_t wsresults[2];
dds_time_t deltaTv;
bool first_batch = true;
unsigned long cycles = 0;
double deltaTime = 0;
dds_time_t prev_time = 0;
dds_time_t time_now = 0;
while (!done && (maxCycles == 0 || cycles < maxCycles))
{
if (pollingDelay)
if (pollingDelay > 0)
{
dds_sleepfor (DDS_MSECS (pollingDelay));
while (do_take (reader))
;
}
else
{
status = dds_waitset_wait (waitSet, wsresults, wsresultsize, DDS_INFINITY);
status = dds_waitset_wait (waitSet, wsresults, sizeof(wsresults)/sizeof(wsresults[0]), DDS_SECS(1));
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
if ((status > 0 ) && (dds_triggered (pollingWaitset)))
{
dds_waitset_set_trigger (pollingWaitset, false);
}
while (do_take (reader))
;
}
time_now = dds_time();
if (!first_batch)
{
deltaTv = time_now - prev_time;
deltaTime = (double) deltaTv / DDS_NSECS_IN_SEC;
prev_time = time_now;
printf
(
"=== [Subscriber] Payload size: %lu | Total received: %llu samples, %llu bytes | Out of order: %llu samples "
"Transfer rate: %.2lf samples/s, %.2lf Mbit/s\n",
payloadSize, total_samples, total_bytes, outOfOrder,
(deltaTime != 0.0) ? ((double)(total_samples - prev_samples) / deltaTime) : 0,
(deltaTime != 0.0) ? ((double)((total_bytes - prev_bytes) / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime) : 0
);
cycles++;
if (deltaTime >= 1.0 && total_samples != prev_samples)
{
printf ("=== [Subscriber] %5.3f Payload size: %lu | Total received: %llu samples, %llu bytes | Out of order: %llu samples "
"Transfer rate: %.2lf samples/s, %.2lf Mbit/s\n",
deltaTime, payloadSize, total_samples, total_bytes, outOfOrder,
(deltaTime != 0.0) ? ((double)(total_samples - prev_samples) / deltaTime) : 0,
(deltaTime != 0.0) ? ((double)((total_bytes - prev_bytes) / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime) : 0);
cycles++;
prev_time = time_now;
prev_bytes = total_bytes;
prev_samples = total_samples;
}
}
else
{
prev_time = time_now;
first_batch = false;
}
/* Update the previous values for next iteration */
prev_bytes = total_bytes;
prev_samples = total_samples;
}
/* Output totals and averages */
@ -345,7 +337,7 @@ static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName)
dds_listener_t *rd_listener;
dds_entity_t participant;
int32_t maxSamples = 400;
int32_t maxSamples = 4000;
const char *subParts[1];
dds_qos_t *subQos = dds_create_qos ();
dds_qos_t *drQos = dds_create_qos ();
@ -381,12 +373,6 @@ static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName)
waitSet = dds_create_waitset (participant);
DDS_ERR_CHECK (waitSet, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
pollingWaitset = dds_create_waitset (participant);
DDS_ERR_CHECK (pollingWaitset, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
status = dds_waitset_attach (waitSet, pollingWaitset, pollingWaitset);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
status = dds_waitset_attach (waitSet, waitSet, waitSet);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
@ -398,8 +384,15 @@ static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName)
samples[i] = &data[i];
}
*reader = dds_create_reader (subscriber, topic, drQos, rd_listener);
*reader = dds_create_reader (subscriber, topic, drQos, pollingDelay < 0 ? rd_listener : NULL);
DDS_ERR_CHECK (*reader, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
if (pollingDelay == 0)
{
status = dds_waitset_attach (waitSet, *reader, *reader);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
}
dds_delete_qos (drQos);
dds_delete_listener(rd_listener);
@ -417,10 +410,6 @@ static void finalize_dds(dds_entity_t participant)
status = dds_waitset_detach (waitSet, waitSet);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
status = dds_waitset_detach (waitSet, pollingWaitset);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
status = dds_delete (pollingWaitset);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
status = dds_delete (waitSet);
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
status = dds_delete (participant);