diff --git a/src/examples/roundtrip/ping.c b/src/examples/roundtrip/ping.c index e94c277..7e39d70 100644 --- a/src/examples/roundtrip/ping.c +++ b/src/examples/roundtrip/ping.c @@ -12,7 +12,7 @@ /* Forward declaration */ -static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond); +static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond, dds_listener_t *listener); static void finalize_dds(dds_entity_t participant, dds_entity_t reader, dds_entity_t readCond); typedef struct ExampleTimeStats @@ -101,6 +101,12 @@ static double exampleGetMedianFromTimeStats (ExampleTimeStats *stats) return median; } +static dds_time_t exampleGet99PercentileFromTimeStats (ExampleTimeStats *stats) +{ + qsort (stats->values, stats->valuesSize, sizeof (dds_time_t), exampleCompareul); + return stats->values[stats->valuesSize - stats->valuesSize / 100]; +} + static dds_entity_t waitSet; #ifdef _WIN32 @@ -117,44 +123,124 @@ static void CtrlHandler (int fdwCtrlType) } #endif +static dds_entity_t writer; +static dds_entity_t reader; +static dds_entity_t participant; +static dds_entity_t readCond; + +static ExampleTimeStats roundTrip; +static ExampleTimeStats writeAccess; +static ExampleTimeStats readAccess; +static ExampleTimeStats roundTripOverall; +static ExampleTimeStats writeAccessOverall; +static ExampleTimeStats readAccessOverall; + +static RoundTripModule_DataType pub_data; +static RoundTripModule_DataType sub_data[MAX_SAMPLES]; +static void *samples[MAX_SAMPLES]; +static dds_sample_info_t info[MAX_SAMPLES]; + +static dds_time_t startTime; +static dds_time_t preWriteTime; +static dds_time_t postWriteTime; +static dds_time_t preTakeTime; +static dds_time_t postTakeTime; +static dds_time_t elapsed = 0; + +static bool warmUp = true; + +static void data_available(dds_entity_t reader, void *arg) +{ + dds_time_t difference = 0; + int status; + (void)arg; + /* Take sample and check that it is valid */ + preTakeTime = dds_time (); + status = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES); + DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + postTakeTime = dds_time (); + + /* Update stats */ + difference = (postWriteTime - preWriteTime)/DDS_NSECS_IN_USEC; + writeAccess = *exampleAddTimingToTimeStats (&writeAccess, difference); + writeAccessOverall = *exampleAddTimingToTimeStats (&writeAccessOverall, difference); + + difference = (postTakeTime - preTakeTime)/DDS_NSECS_IN_USEC; + readAccess = *exampleAddTimingToTimeStats (&readAccess, difference); + readAccessOverall = *exampleAddTimingToTimeStats (&readAccessOverall, difference); + + difference = (postTakeTime - info[0].source_timestamp)/DDS_NSECS_IN_USEC; + roundTrip = *exampleAddTimingToTimeStats (&roundTrip, difference); + roundTripOverall = *exampleAddTimingToTimeStats (&roundTripOverall, difference); + + if (!warmUp) { + /* Print stats each second */ + difference = (postTakeTime - startTime)/DDS_NSECS_IN_USEC; + if (difference > US_IN_ONE_SEC) + { + printf("%9" PRIi64 " %9lu %8.0f %8" PRIi64 " %8" PRIi64 " %8" PRIi64 " %10lu %8.0f %8" PRIi64 " %10lu %8.0f %8" PRIi64 "\n", + elapsed + 1, + roundTrip.count, + exampleGetMedianFromTimeStats (&roundTrip), + roundTrip.min, + exampleGet99PercentileFromTimeStats (&roundTrip), + roundTrip.max, + writeAccess.count, + exampleGetMedianFromTimeStats (&writeAccess), + writeAccess.min, + readAccess.count, + exampleGetMedianFromTimeStats (&readAccess), + readAccess.min); + + exampleResetTimeStats (&roundTrip); + exampleResetTimeStats (&writeAccess); + exampleResetTimeStats (&readAccess); + startTime = dds_time (); + elapsed++; + } + } + + preWriteTime = dds_time(); + status = dds_write_ts (writer, &pub_data, preWriteTime); + DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + postWriteTime = dds_time(); +} + +static void usage(void) +{ + printf ("Usage (parameters must be supplied in order):\n" + "./ping [-l] [payloadSize (bytes, 0 - 100M)] [numSamples (0 = infinite)] [timeOut (seconds, 0 = infinite)]\n" + "./ping quit - ping sends a quit signal to pong.\n" + "Defaults:\n" + "./ping 0 0 0\n"); + exit(EXIT_FAILURE); +} + int main (int argc, char *argv[]) { - dds_entity_t writer; - dds_entity_t reader; - dds_entity_t participant; - dds_entity_t readCond; - - ExampleTimeStats roundTrip; - ExampleTimeStats writeAccess; - ExampleTimeStats readAccess; - ExampleTimeStats roundTripOverall; - ExampleTimeStats writeAccessOverall; - ExampleTimeStats readAccessOverall; - unsigned long payloadSize = 0; unsigned long long numSamples = 0; bool invalidargs = false; dds_time_t timeOut = 0; - dds_time_t startTime; dds_time_t time; - dds_time_t preWriteTime; - dds_time_t postWriteTime; - dds_time_t preTakeTime; - dds_time_t postTakeTime; dds_time_t difference = 0; - dds_time_t elapsed = 0; - - RoundTripModule_DataType pub_data; - RoundTripModule_DataType sub_data[MAX_SAMPLES]; - void *samples[MAX_SAMPLES]; - dds_sample_info_t info[MAX_SAMPLES]; dds_attach_t wsresults[1]; size_t wsresultsize = 1U; dds_time_t waitTimeout = DDS_SECS (1); unsigned long i; int status; - bool warmUp = true; + + dds_listener_t *listener = NULL; + bool use_listener = false; + int argidx = 1; + + /* poor man's getopt works even on Windows */ + if (argc > argidx && strcmp(argv[argidx], "-l") == 0) + { + argidx++; + use_listener = true; + } /* Register handler for Ctrl-C */ #ifdef _WIN32 @@ -182,11 +268,17 @@ int main (int argc, char *argv[]) samples[i] = &sub_data[i]; } - participant = prepare_dds(&writer, &reader, &readCond); + participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL); + DDS_ERR_CHECK (participant, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + + listener = dds_listener_create(NULL); + dds_lset_data_available(listener, data_available); + + prepare_dds(&writer, &reader, &readCond, listener); setvbuf(stdout, NULL, _IONBF, 0); - if (argc == 2 && strcmp (argv[1], "quit") == 0) + if (argc - argidx == 1 && strcmp (argv[argidx], "quit") == 0) { printf ("Sending termination request.\n"); /* pong uses a waitset which is triggered by instance disposal, and @@ -202,36 +294,29 @@ int main (int argc, char *argv[]) goto done; } - if (argc == 1) + if (argc - argidx == 0) { invalidargs = true; } - if (argc >= 2) + if (argc - argidx >= 1) { - payloadSize = atol (argv[1]); + payloadSize = atol (argv[argidx]); - if (payloadSize > 65536) + if (payloadSize > 100 * 1048576) { invalidargs = true; } } - if (argc >= 3) + if (argc - argidx >= 2) { - numSamples = atol (argv[2]); + numSamples = atol (argv[argidx+1]); } - if (argc >= 4) + if (argc - argidx >= 3) { - timeOut = atol (argv[3]); - } - if (invalidargs || (argc == 2 && (strcmp (argv[1], "-h") == 0 || strcmp (argv[1], "--help") == 0))) - { - printf ("Usage (parameters must be supplied in order):\n" - "./ping [payloadSize (bytes, 0 - 65536)] [numSamples (0 = infinite)] [timeOut (seconds, 0 = infinite)]\n" - "./ping quit - ping sends a quit signal to pong.\n" - "Defaults:\n" - "./ping 0 0 0\n"); - return EXIT_FAILURE; + timeOut = atol (argv[argidx+2]); } + if (invalidargs || (argc - argidx == 1 && (strcmp (argv[argidx], "-h") == 0 || strcmp (argv[argidx], "--help") == 0))) + usage(); printf ("# payloadSize: %lu | numSamples: %llu | timeOut: %" PRIi64 "\n\n", payloadSize, numSamples, timeOut); pub_data.payload._length = payloadSize; @@ -245,13 +330,12 @@ int main (int argc, char *argv[]) startTime = dds_time (); printf ("# Waiting for startup jitter to stabilise\n"); + /* Write a sample that pong can send back */ while (!dds_triggered (waitSet) && difference < DDS_SECS(5)) { - status = dds_write (writer, &pub_data); - DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); status = dds_waitset_wait (waitSet, wsresults, wsresultsize, waitTimeout); DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); - if (status > 0) /* data */ + if (status > 0 && listener == NULL) /* data */ { status = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES); DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); @@ -266,93 +350,26 @@ int main (int argc, char *argv[]) printf("# Warm up complete.\n\n"); printf("# Round trip measurements (in us)\n"); - printf("# Round trip time [us] Write-access time [us] Read-access time [us]\n"); - printf("# Seconds Count median min Count median min Count median min\n"); + printf("# Round trip time [us] Write-access time [us] Read-access time [us]\n"); + printf("# Seconds Count median min 99%% max Count median min Count median min\n"); } + exampleResetTimeStats (&roundTrip); + exampleResetTimeStats (&writeAccess); + exampleResetTimeStats (&readAccess); startTime = dds_time (); + /* Write a sample that pong can send back */ + preWriteTime = dds_time (); + status = dds_write_ts (writer, &pub_data, preWriteTime); + DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + postWriteTime = dds_time (); for (i = 0; !dds_triggered (waitSet) && (!numSamples || i < numSamples); i++) { - /* Write a sample that pong can send back */ - preWriteTime = dds_time (); - status = dds_write (writer, &pub_data); - DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); - postWriteTime = dds_time (); - - /* Wait for response from pong */ status = dds_waitset_wait (waitSet, wsresults, wsresultsize, waitTimeout); DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); - if (status != 0) - { - /* Take sample and check that it is valid */ - preTakeTime = dds_time (); - status = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES); - DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); - postTakeTime = dds_time (); - - if (!dds_triggered (waitSet)) - { - if (status != 1) - { - fprintf (stdout, "%s%d%s", "ERROR: Ping received ", status, - " samples but was expecting 1. Are multiple pong applications running?\n"); - - goto done; - } - else if (!info[0].valid_data) - { - printf ("ERROR: Ping received an invalid sample. Has pong terminated already?\n"); - goto done; - } - } - - /* Update stats */ - difference = (postWriteTime - preWriteTime)/DDS_NSECS_IN_USEC; - writeAccess = *exampleAddTimingToTimeStats (&writeAccess, difference); - writeAccessOverall = *exampleAddTimingToTimeStats (&writeAccessOverall, difference); - - difference = (postTakeTime - preTakeTime)/DDS_NSECS_IN_USEC; - readAccess = *exampleAddTimingToTimeStats (&readAccess, difference); - readAccessOverall = *exampleAddTimingToTimeStats (&readAccessOverall, difference); - - difference = (postTakeTime - preWriteTime)/DDS_NSECS_IN_USEC; - roundTrip = *exampleAddTimingToTimeStats (&roundTrip, difference); - roundTripOverall = *exampleAddTimingToTimeStats (&roundTripOverall, difference); - - /* Print stats each second */ - difference = (postTakeTime - startTime)/DDS_NSECS_IN_USEC; - if (difference > US_IN_ONE_SEC || (i && i == numSamples)) - { - printf - ( - "%9" PRIi64 " %9lu %8.0f %8" PRIi64 " %10lu %8.0f %8" PRIi64 " %10lu %8.0f %8" PRIi64 "\n", - elapsed + 1, - roundTrip.count, - exampleGetMedianFromTimeStats (&roundTrip), - roundTrip.min, - writeAccess.count, - exampleGetMedianFromTimeStats (&writeAccess), - writeAccess.min, - readAccess.count, - exampleGetMedianFromTimeStats (&readAccess), - readAccess.min - ); - - exampleResetTimeStats (&roundTrip); - exampleResetTimeStats (&writeAccess); - exampleResetTimeStats (&readAccess); - startTime = dds_time (); - elapsed++; - } - } - else - { - elapsed += waitTimeout / DDS_NSECS_IN_SEC; - } - if (timeOut && elapsed == timeOut) - { - dds_waitset_set_trigger (waitSet, true); + if (status != 0 && listener == NULL) { + data_available(reader, NULL); } } @@ -401,7 +418,7 @@ done: return EXIT_SUCCESS; } -static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond) +static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond, dds_listener_t *listener) { dds_return_t status; dds_entity_t topic; @@ -415,9 +432,6 @@ static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_ dds_qos_t *drQos; dds_qos_t *subQos; - dds_entity_t participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL); - DDS_ERR_CHECK (participant, DDS_CHECK_REPORT | DDS_CHECK_EXIT); - /* A DDS_Topic is created for our sample type on the domain participant. */ topic = dds_create_topic (participant, &RoundTripModule_DataType_desc, "RoundTrip", NULL, NULL); DDS_ERR_CHECK (topic, DDS_CHECK_REPORT | DDS_CHECK_EXIT); @@ -449,15 +463,18 @@ static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_ /* A DDS_DataReader is created on the Subscriber & Topic with a modified QoS. */ drQos = dds_qos_create (); dds_qset_reliability (drQos, DDS_RELIABILITY_RELIABLE, DDS_SECS(10)); - *reader = dds_create_reader (subscriber, topic, drQos, NULL); + *reader = dds_create_reader (subscriber, topic, drQos, listener); DDS_ERR_CHECK (*reader, DDS_CHECK_REPORT | DDS_CHECK_EXIT); dds_qos_delete (drQos); waitSet = dds_create_waitset (participant); - *readCond = dds_create_readcondition (*reader, DDS_ANY_STATE); - - status = dds_waitset_attach (waitSet, *readCond, *reader); - DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + if (listener == NULL) { + *readCond = dds_create_readcondition (*reader, DDS_ANY_STATE); + status = dds_waitset_attach (waitSet, *readCond, *reader); + DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + } else { + *readCond = 0; + } status = dds_waitset_attach (waitSet, waitSet, waitSet); DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); @@ -471,12 +488,10 @@ static void finalize_dds(dds_entity_t participant, dds_entity_t reader, dds_enti /* Disable callbacks */ dds_set_enabled_status (reader, 0); - status = dds_waitset_detach (waitSet, readCond); - DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + (void) dds_waitset_detach (waitSet, readCond); status = dds_waitset_detach (waitSet, waitSet); DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); - status = dds_delete (readCond); - DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + (void) dds_delete (readCond); status = dds_delete (waitSet); DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); status = dds_delete (participant); diff --git a/src/examples/roundtrip/pong.c b/src/examples/roundtrip/pong.c index fa39cdd..371b072 100644 --- a/src/examples/roundtrip/pong.c +++ b/src/examples/roundtrip/pong.c @@ -10,7 +10,7 @@ static dds_entity_t waitSet; #define MAX_SAMPLES 10 /* Forward declarations */ -static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond); +static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond, dds_listener_t *listener); static void finalize_dds(dds_entity_t participant, dds_entity_t readCond, RoundTripModule_DataType data[MAX_SAMPLES]); #ifdef _WIN32 @@ -27,21 +27,58 @@ static void CtrlHandler (int fdwCtrlType) } #endif +static RoundTripModule_DataType data[MAX_SAMPLES]; +static void * samples[MAX_SAMPLES]; +static dds_sample_info_t info[MAX_SAMPLES]; + +static dds_entity_t participant; +static dds_entity_t reader; +static dds_entity_t writer; +static dds_entity_t readCond; + +static void data_available(dds_entity_t reader, void *arg) +{ + int status, samplecount; + (void)arg; + samplecount = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES); + DDS_ERR_CHECK (samplecount, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + for (int j = 0; !dds_triggered (waitSet) && j < samplecount; j++) + { + /* If writer has been disposed terminate pong */ + + if (info[j].instance_state == DDS_IST_NOT_ALIVE_DISPOSED) + { + printf ("Received termination request. Terminating.\n"); + dds_waitset_set_trigger (waitSet, true); + break; + } + else if (info[j].valid_data) + { + /* If sample is valid, send it back to ping */ + RoundTripModule_DataType * valid_sample = &data[j]; + status = dds_write_ts (writer, valid_sample, info[j].source_timestamp); + DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + } + } +} + int main (int argc, char *argv[]) { dds_duration_t waitTimeout = DDS_INFINITY; unsigned int i; - int status, samplecount, j; + int status; dds_attach_t wsresults[1]; size_t wsresultsize = 1U; - dds_entity_t participant; - dds_entity_t reader; - dds_entity_t writer; - dds_entity_t readCond; - RoundTripModule_DataType data[MAX_SAMPLES]; - void * samples[MAX_SAMPLES]; - dds_sample_info_t info[MAX_SAMPLES]; + dds_listener_t *listener = NULL; + bool use_listener = false; + int argidx = 1; + + if (argc > argidx && strcmp(argv[argidx], "-l") == 0) + { + argidx++; + use_listener = true; + } /* Register handler for Ctrl-C */ @@ -55,8 +92,6 @@ int main (int argc, char *argv[]) sigaction (SIGINT, &sat, &oldAction); #endif - participant = prepare_dds(&writer, &reader, &readCond); - /* Initialize sample data */ memset (data, 0, sizeof (data)); for (i = 0; i < MAX_SAMPLES; i++) @@ -64,6 +99,17 @@ int main (int argc, char *argv[]) samples[i] = &data[i]; } + participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL); + DDS_ERR_CHECK (participant, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + + if (use_listener) + { + listener = dds_listener_create(NULL); + dds_lset_data_available(listener, data_available); + } + + (void)prepare_dds(&writer, &reader, &readCond, listener); + while (!dds_triggered (waitSet)) { /* Wait for a sample from ping */ @@ -72,25 +118,8 @@ int main (int argc, char *argv[]) DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); /* Take samples */ - samplecount = dds_take (reader, samples, info, MAX_SAMPLES, MAX_SAMPLES); - DDS_ERR_CHECK (samplecount, DDS_CHECK_REPORT | DDS_CHECK_EXIT); - for (j = 0; !dds_triggered (waitSet) && j < samplecount; j++) - { - /* If writer has been disposed terminate pong */ - - if (info[j].instance_state == DDS_IST_NOT_ALIVE_DISPOSED) - { - printf ("Received termination request. Terminating.\n"); - dds_waitset_set_trigger (waitSet, true); - break; - } - else if (info[j].valid_data) - { - /* If sample is valid, send it back to ping */ - RoundTripModule_DataType * valid_sample = &data[j]; - status = dds_write (writer, valid_sample); - DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); - } + if (listener == NULL) { + data_available (reader, 0); } } @@ -108,12 +137,11 @@ int main (int argc, char *argv[]) static void finalize_dds(dds_entity_t participant, dds_entity_t readCond, RoundTripModule_DataType data[MAX_SAMPLES]) { - dds_return_t status = dds_waitset_detach (waitSet, readCond); - DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + dds_return_t status; + (void)dds_waitset_detach (waitSet, readCond); status = dds_waitset_detach (waitSet, waitSet); DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); - status = dds_delete (readCond); - DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + (void)dds_delete (readCond); status = dds_delete (waitSet); DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); status = dds_delete (participant); @@ -125,7 +153,7 @@ static void finalize_dds(dds_entity_t participant, dds_entity_t readCond, RoundT } } -static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond) +static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_entity_t *readCond, dds_listener_t *listener) { const char *pubPartitions[] = { "pong" }; const char *subPartitions[] = { "ping" }; @@ -135,9 +163,6 @@ static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_ dds_entity_t topic; dds_return_t status; - dds_entity_t participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL); - DDS_ERR_CHECK (participant, DDS_CHECK_REPORT | DDS_CHECK_EXIT); - /* A DDS Topic is created for our sample type on the domain participant. */ topic = dds_create_topic (participant, &RoundTripModule_DataType_desc, "RoundTrip", NULL, NULL); @@ -174,14 +199,18 @@ static dds_entity_t prepare_dds(dds_entity_t *writer, dds_entity_t *reader, dds_ qos = dds_qos_create (); dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(10)); - *reader = dds_create_reader (subscriber, topic, qos, NULL); + *reader = dds_create_reader (subscriber, topic, qos, listener); DDS_ERR_CHECK (*reader, DDS_CHECK_REPORT | DDS_CHECK_EXIT); dds_qos_delete (qos); waitSet = dds_create_waitset (participant); - *readCond = dds_create_readcondition (*reader, DDS_ANY_STATE); - status = dds_waitset_attach (waitSet, *readCond, *reader); - DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + if (listener == NULL) { + *readCond = dds_create_readcondition (*reader, DDS_ANY_STATE); + status = dds_waitset_attach (waitSet, *readCond, *reader); + DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT); + } else { + *readCond = 0; + } status = dds_waitset_attach (waitSet, waitSet, waitSet); DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);