using explicit fflush instead of setvbuf (#133)
Because line-buffering doesn't exist on Windows ... Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
959a096372
commit
c39cc74e13
6 changed files with 133 additions and 30 deletions
|
@ -31,6 +31,7 @@ int main (int argc, char ** argv)
|
||||||
DDS_FATAL("dds_create_write: %s\n", dds_strretcode(-writer));
|
DDS_FATAL("dds_create_write: %s\n", dds_strretcode(-writer));
|
||||||
|
|
||||||
printf("=== [Publisher] Waiting for a reader to be discovered ...\n");
|
printf("=== [Publisher] Waiting for a reader to be discovered ...\n");
|
||||||
|
fflush (stdout);
|
||||||
|
|
||||||
rc = dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS);
|
rc = dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS);
|
||||||
if (rc != DDS_RETCODE_OK)
|
if (rc != DDS_RETCODE_OK)
|
||||||
|
@ -52,6 +53,7 @@ int main (int argc, char ** argv)
|
||||||
|
|
||||||
printf ("=== [Publisher] Writing : ");
|
printf ("=== [Publisher] Writing : ");
|
||||||
printf ("Message (%d, %s)\n", msg.userID, msg.message);
|
printf ("Message (%d, %s)\n", msg.userID, msg.message);
|
||||||
|
fflush (stdout);
|
||||||
|
|
||||||
rc = dds_write (writer, &msg);
|
rc = dds_write (writer, &msg);
|
||||||
if (rc != DDS_RETCODE_OK)
|
if (rc != DDS_RETCODE_OK)
|
||||||
|
|
|
@ -40,6 +40,7 @@ int main (int argc, char ** argv)
|
||||||
dds_delete_qos(qos);
|
dds_delete_qos(qos);
|
||||||
|
|
||||||
printf ("\n=== [Subscriber] Waiting for a sample ...\n");
|
printf ("\n=== [Subscriber] Waiting for a sample ...\n");
|
||||||
|
fflush (stdout);
|
||||||
|
|
||||||
/* Initialize sample buffer, by pointing the void pointer within
|
/* Initialize sample buffer, by pointing the void pointer within
|
||||||
* the buffer array to a valid sample memory location. */
|
* the buffer array to a valid sample memory location. */
|
||||||
|
@ -61,6 +62,7 @@ int main (int argc, char ** argv)
|
||||||
msg = (HelloWorldData_Msg*) samples[0];
|
msg = (HelloWorldData_Msg*) samples[0];
|
||||||
printf ("=== [Subscriber] Received : ");
|
printf ("=== [Subscriber] Received : ");
|
||||||
printf ("Message (%d, %s)\n", msg->userID, msg->message);
|
printf ("Message (%d, %s)\n", msg->userID, msg->message);
|
||||||
|
fflush (stdout);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
|
@ -193,6 +193,7 @@ static void data_available(dds_entity_t rd, void *arg)
|
||||||
readAccess.count,
|
readAccess.count,
|
||||||
exampleGetMedianFromTimeStats (&readAccess),
|
exampleGetMedianFromTimeStats (&readAccess),
|
||||||
readAccess.min);
|
readAccess.min);
|
||||||
|
fflush (stdout);
|
||||||
|
|
||||||
exampleResetTimeStats (&roundTrip);
|
exampleResetTimeStats (&roundTrip);
|
||||||
exampleResetTimeStats (&writeAccess);
|
exampleResetTimeStats (&writeAccess);
|
||||||
|
@ -282,11 +283,10 @@ int main (int argc, char *argv[])
|
||||||
}
|
}
|
||||||
prepare_dds(&writer, &reader, &readCond, listener);
|
prepare_dds(&writer, &reader, &readCond, listener);
|
||||||
|
|
||||||
setvbuf(stdout, NULL, _IONBF, 0);
|
|
||||||
|
|
||||||
if (argc - argidx == 1 && strcmp (argv[argidx], "quit") == 0)
|
if (argc - argidx == 1 && strcmp (argv[argidx], "quit") == 0)
|
||||||
{
|
{
|
||||||
printf ("Sending termination request.\n");
|
printf ("Sending termination request.\n");
|
||||||
|
fflush (stdout);
|
||||||
/* pong uses a waitset which is triggered by instance disposal, and
|
/* pong uses a waitset which is triggered by instance disposal, and
|
||||||
quits when it fires. */
|
quits when it fires. */
|
||||||
dds_sleepfor (DDS_SECS (1));
|
dds_sleepfor (DDS_SECS (1));
|
||||||
|
@ -325,6 +325,7 @@ int main (int argc, char *argv[])
|
||||||
if (invalidargs || (argc - argidx == 1 && (strcmp (argv[argidx], "-h") == 0 || strcmp (argv[argidx], "--help") == 0)))
|
if (invalidargs || (argc - argidx == 1 && (strcmp (argv[argidx], "-h") == 0 || strcmp (argv[argidx], "--help") == 0)))
|
||||||
usage();
|
usage();
|
||||||
printf ("# payloadSize: %" PRIu32 " | numSamples: %" PRIu64 " | timeOut: %" PRIi64 "\n\n", payloadSize, numSamples, timeOut);
|
printf ("# payloadSize: %" PRIu32 " | numSamples: %" PRIu64 " | timeOut: %" PRIi64 "\n\n", payloadSize, numSamples, timeOut);
|
||||||
|
fflush (stdout);
|
||||||
|
|
||||||
pub_data.payload._length = payloadSize;
|
pub_data.payload._length = payloadSize;
|
||||||
pub_data.payload._buffer = payloadSize ? dds_alloc (payloadSize) : NULL;
|
pub_data.payload._buffer = payloadSize ? dds_alloc (payloadSize) : NULL;
|
||||||
|
@ -337,6 +338,7 @@ int main (int argc, char *argv[])
|
||||||
|
|
||||||
startTime = dds_time ();
|
startTime = dds_time ();
|
||||||
printf ("# Waiting for startup jitter to stabilise\n");
|
printf ("# Waiting for startup jitter to stabilise\n");
|
||||||
|
fflush (stdout);
|
||||||
/* Write a sample that pong can send back */
|
/* Write a sample that pong can send back */
|
||||||
while (!dds_triggered (waitSet) && difference < DDS_SECS(5))
|
while (!dds_triggered (waitSet) && difference < DDS_SECS(5))
|
||||||
{
|
{
|
||||||
|
@ -358,11 +360,10 @@ int main (int argc, char *argv[])
|
||||||
{
|
{
|
||||||
warmUp = false;
|
warmUp = false;
|
||||||
printf("# Warm up complete.\n\n");
|
printf("# Warm up complete.\n\n");
|
||||||
|
|
||||||
printf("# Latency measurements (in us)\n");
|
printf("# Latency measurements (in us)\n");
|
||||||
printf("# Latency [us] Write-access time [us] Read-access time [us]\n");
|
printf("# Latency [us] Write-access time [us] Read-access time [us]\n");
|
||||||
printf("# Seconds Count median min 99%% max Count median min Count median min\n");
|
printf("# Seconds Count median min 99%% max Count median min Count median min\n");
|
||||||
|
fflush (stdout);
|
||||||
}
|
}
|
||||||
|
|
||||||
exampleResetTimeStats (&roundTrip);
|
exampleResetTimeStats (&roundTrip);
|
||||||
|
@ -403,6 +404,7 @@ int main (int argc, char *argv[])
|
||||||
exampleGetMedianFromTimeStats (&readAccessOverall),
|
exampleGetMedianFromTimeStats (&readAccessOverall),
|
||||||
readAccessOverall.min
|
readAccessOverall.min
|
||||||
);
|
);
|
||||||
|
fflush (stdout);
|
||||||
}
|
}
|
||||||
|
|
||||||
done:
|
done:
|
||||||
|
|
|
@ -47,10 +47,6 @@ int main (int argc, char **argv)
|
||||||
dds_return_t rc;
|
dds_return_t rc;
|
||||||
ThroughputModule_DataType sample;
|
ThroughputModule_DataType sample;
|
||||||
|
|
||||||
#if !defined(_WIN32)
|
|
||||||
setvbuf (stdout, NULL, _IOLBF, 0);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (parse_args(argc, argv, &payloadSize, &burstInterval, &burstSize, &timeOut, &partitionName) == EXIT_FAILURE) {
|
if (parse_args(argc, argv, &payloadSize, &burstInterval, &burstSize, &timeOut, &partitionName) == EXIT_FAILURE) {
|
||||||
return EXIT_FAILURE;
|
return EXIT_FAILURE;
|
||||||
}
|
}
|
||||||
|
@ -60,6 +56,7 @@ int main (int argc, char **argv)
|
||||||
/* Wait until have a reader */
|
/* Wait until have a reader */
|
||||||
if (wait_for_reader(writer, participant) == 0) {
|
if (wait_for_reader(writer, participant) == 0) {
|
||||||
printf ("=== [Publisher] Did not discover a reader.\n");
|
printf ("=== [Publisher] Did not discover a reader.\n");
|
||||||
|
fflush (stdout);
|
||||||
rc = dds_delete (participant);
|
rc = dds_delete (participant);
|
||||||
if (rc < 0)
|
if (rc < 0)
|
||||||
DDS_FATAL("dds_delete: %s\n", dds_strretcode(-rc));
|
DDS_FATAL("dds_delete: %s\n", dds_strretcode(-rc));
|
||||||
|
@ -131,6 +128,7 @@ static int parse_args(
|
||||||
|
|
||||||
printf ("payloadSize: %u bytes burstInterval: %u ms burstSize: %u timeOut: %u seconds partitionName: %s\n",
|
printf ("payloadSize: %u bytes burstInterval: %u ms burstSize: %u timeOut: %u seconds partitionName: %s\n",
|
||||||
*payloadSize, *burstInterval, *burstSize, *timeOut, *partitionName);
|
*payloadSize, *burstInterval, *burstSize, *timeOut, *partitionName);
|
||||||
|
fflush (stdout);
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -182,6 +180,7 @@ static dds_entity_t prepare_dds(dds_entity_t *writer, const char *partitionName)
|
||||||
static dds_return_t wait_for_reader(dds_entity_t writer, dds_entity_t participant)
|
static dds_return_t wait_for_reader(dds_entity_t writer, dds_entity_t participant)
|
||||||
{
|
{
|
||||||
printf ("\n=== [Publisher] Waiting for a reader ...\n");
|
printf ("\n=== [Publisher] Waiting for a reader ...\n");
|
||||||
|
fflush (stdout);
|
||||||
|
|
||||||
dds_return_t rc;
|
dds_return_t rc;
|
||||||
dds_entity_t waitset;
|
dds_entity_t waitset;
|
||||||
|
@ -224,6 +223,7 @@ static void start_writing(
|
||||||
unsigned int burstCount = 0;
|
unsigned int burstCount = 0;
|
||||||
|
|
||||||
printf ("=== [Publisher] Writing samples...\n");
|
printf ("=== [Publisher] Writing samples...\n");
|
||||||
|
fflush (stdout);
|
||||||
|
|
||||||
while (!done && !timedOut)
|
while (!done && !timedOut)
|
||||||
{
|
{
|
||||||
|
@ -277,14 +277,8 @@ static void start_writing(
|
||||||
}
|
}
|
||||||
dds_write_flush (writer);
|
dds_write_flush (writer);
|
||||||
|
|
||||||
if (done)
|
printf ("=== [Publisher] %s, %llu samples written.\n", done ? "Terminated" : "Timed out", (unsigned long long) sample->count);
|
||||||
{
|
fflush (stdout);
|
||||||
printf ("=== [Publisher] Terminated, %llu samples written.\n", (unsigned long long) sample->count);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
printf ("=== [Publisher] Timed out, %llu samples written.\n", (unsigned long long) sample->count);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -75,20 +75,18 @@ int main (int argc, char **argv)
|
||||||
dds_entity_t participant;
|
dds_entity_t participant;
|
||||||
dds_entity_t reader;
|
dds_entity_t reader;
|
||||||
|
|
||||||
#if !defined(_WIN32)
|
|
||||||
setvbuf (stdout, NULL, _IOLBF, 0);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (parse_args(argc, argv, &maxCycles, &partitionName) == EXIT_FAILURE)
|
if (parse_args(argc, argv, &maxCycles, &partitionName) == EXIT_FAILURE)
|
||||||
{
|
{
|
||||||
return EXIT_FAILURE;
|
return EXIT_FAILURE;
|
||||||
}
|
}
|
||||||
|
|
||||||
printf ("Cycles: %llu | PollingDelay: %ld | Partition: %s\n", maxCycles, pollingDelay, partitionName);
|
printf ("Cycles: %llu | PollingDelay: %ld | Partition: %s\n", maxCycles, pollingDelay, partitionName);
|
||||||
|
fflush (stdout);
|
||||||
|
|
||||||
participant = prepare_dds(&reader, partitionName);
|
participant = prepare_dds(&reader, partitionName);
|
||||||
|
|
||||||
printf ("=== [Subscriber] Waiting for samples...\n");
|
printf ("=== [Subscriber] Waiting for samples...\n");
|
||||||
|
fflush (stdout);
|
||||||
|
|
||||||
/* Process samples until Ctrl-C is pressed or until maxCycles */
|
/* Process samples until Ctrl-C is pressed or until maxCycles */
|
||||||
/* has been reached (0 = infinite) */
|
/* has been reached (0 = infinite) */
|
||||||
|
@ -280,6 +278,7 @@ static void process_samples(dds_entity_t reader, unsigned long long maxCycles)
|
||||||
deltaTime, payloadSize, total_samples, total_bytes, outOfOrder,
|
deltaTime, payloadSize, total_samples, total_bytes, outOfOrder,
|
||||||
(deltaTime != 0.0) ? ((double)(total_samples - prev_samples) / deltaTime) : 0,
|
(deltaTime != 0.0) ? ((double)(total_samples - prev_samples) / deltaTime) : 0,
|
||||||
(deltaTime != 0.0) ? ((double)((total_bytes - prev_bytes) / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime) : 0);
|
(deltaTime != 0.0) ? ((double)((total_bytes - prev_bytes) / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime) : 0);
|
||||||
|
fflush (stdout);
|
||||||
cycles++;
|
cycles++;
|
||||||
prev_time = time_now;
|
prev_time = time_now;
|
||||||
prev_bytes = total_bytes;
|
prev_bytes = total_bytes;
|
||||||
|
@ -300,6 +299,7 @@ static void process_samples(dds_entity_t reader, unsigned long long maxCycles)
|
||||||
printf ("Out of order: %llu samples\n", outOfOrder);
|
printf ("Out of order: %llu samples\n", outOfOrder);
|
||||||
printf ("Average transfer rate: %.2lf samples/s, ", (double)total_samples / deltaTime);
|
printf ("Average transfer rate: %.2lf samples/s, ", (double)total_samples / deltaTime);
|
||||||
printf ("%.2lf Mbit/s\n", (double)(total_bytes / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime);
|
printf ("%.2lf Mbit/s\n", (double)(total_bytes / BYTES_PER_SEC_TO_MEGABITS_PER_SEC) / deltaTime);
|
||||||
|
fflush (stdout);
|
||||||
}
|
}
|
||||||
|
|
||||||
static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName)
|
static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName)
|
||||||
|
|
|
@ -62,6 +62,7 @@ enum readermode { MODE_PRINT, MODE_CHECK, MODE_ZEROLOAD, MODE_DUMP, MODE_NONE };
|
||||||
#define PM_STATE 512u
|
#define PM_STATE 512u
|
||||||
|
|
||||||
static volatile sig_atomic_t termflag = 0;
|
static volatile sig_atomic_t termflag = 0;
|
||||||
|
static int flushflag = 0;
|
||||||
static int pid;
|
static int pid;
|
||||||
static dds_entity_t termcond;
|
static dds_entity_t termcond;
|
||||||
static unsigned nkeyvals = 1;
|
static unsigned nkeyvals = 1;
|
||||||
|
@ -885,6 +886,9 @@ static void print_K(dds_time_t *tstart, dds_time_t tnow, dds_entity_t rd, const
|
||||||
} else
|
} else
|
||||||
printf ("get_key_value: error (%s)\n", dds_err_str(result));
|
printf ("get_key_value: error (%s)\n", dds_err_str(result));
|
||||||
}
|
}
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
ddsrt_mutex_unlock(&output_mutex);
|
ddsrt_mutex_unlock(&output_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -935,6 +939,9 @@ static void print_seq_OU(dds_time_t *tstart, dds_time_t tnow, dds_entity_t rd __
|
||||||
} else {
|
} else {
|
||||||
printf ("NA\n");
|
printf ("NA\n");
|
||||||
}
|
}
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
ddsrt_mutex_unlock(&output_mutex);
|
ddsrt_mutex_unlock(&output_mutex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -967,10 +974,16 @@ static void rd_on_liveliness_changed(dds_entity_t rd __attribute__ ((unused)), c
|
||||||
status.alive_count, status.alive_count_change,
|
status.alive_count, status.alive_count_change,
|
||||||
status.not_alive_count, status.not_alive_count_change,
|
status.not_alive_count, status.not_alive_count_change,
|
||||||
status.last_publication_handle);
|
status.last_publication_handle);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rd_on_sample_lost(dds_entity_t rd __attribute__ ((unused)), const dds_sample_lost_status_t status, void* arg __attribute__ ((unused))) {
|
static void rd_on_sample_lost(dds_entity_t rd __attribute__ ((unused)), const dds_sample_lost_status_t status, void* arg __attribute__ ((unused))) {
|
||||||
printf ("[sample-lost: total=(%"PRIu32" change %"PRId32")]\n", status.total_count, status.total_count_change);
|
printf ("[sample-lost: total=(%"PRIu32" change %"PRId32")]\n", status.total_count, status.total_count_change);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rd_on_sample_rejected(dds_entity_t rd __attribute__ ((unused)), const dds_sample_rejected_status_t status, void* arg __attribute__ ((unused))) {
|
static void rd_on_sample_rejected(dds_entity_t rd __attribute__ ((unused)), const dds_sample_rejected_status_t status, void* arg __attribute__ ((unused))) {
|
||||||
|
@ -985,6 +998,9 @@ static void rd_on_sample_rejected(dds_entity_t rd __attribute__ ((unused)), cons
|
||||||
status.total_count, status.total_count_change,
|
status.total_count, status.total_count_change,
|
||||||
reasonstr,
|
reasonstr,
|
||||||
status.last_instance_handle);
|
status.last_instance_handle);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rd_on_subscription_matched(dds_entity_t rd __attribute__((unused)), const dds_subscription_matched_status_t status, void* arg __attribute__((unused))) {
|
static void rd_on_subscription_matched(dds_entity_t rd __attribute__((unused)), const dds_subscription_matched_status_t status, void* arg __attribute__((unused))) {
|
||||||
|
@ -992,12 +1008,18 @@ static void rd_on_subscription_matched(dds_entity_t rd __attribute__((unused)),
|
||||||
status.total_count, status.total_count_change,
|
status.total_count, status.total_count_change,
|
||||||
status.current_count, status.current_count_change,
|
status.current_count, status.current_count_change,
|
||||||
status.last_publication_handle);
|
status.last_publication_handle);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rd_on_requested_deadline_missed(dds_entity_t rd __attribute__((unused)), const dds_requested_deadline_missed_status_t status, void* arg __attribute__ ((unused))) {
|
static void rd_on_requested_deadline_missed(dds_entity_t rd __attribute__((unused)), const dds_requested_deadline_missed_status_t status, void* arg __attribute__ ((unused))) {
|
||||||
printf ("[requested-deadline-missed: total=(%"PRIu32" change %"PRId32") handle=%"PRIu64"]\n",
|
printf ("[requested-deadline-missed: total=(%"PRIu32" change %"PRId32") handle=%"PRIu64"]\n",
|
||||||
status.total_count, status.total_count_change,
|
status.total_count, status.total_count_change,
|
||||||
status.last_instance_handle);
|
status.last_instance_handle);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char *policystr(uint32_t id) {
|
static const char *policystr(uint32_t id) {
|
||||||
|
@ -1048,21 +1070,33 @@ static const char *policystr(uint32_t id) {
|
||||||
static void rd_on_requested_incompatible_qos(dds_entity_t rd __attribute__((unused)), const dds_requested_incompatible_qos_status_t status, void* arg __attribute__((unused))) {
|
static void rd_on_requested_incompatible_qos(dds_entity_t rd __attribute__((unused)), const dds_requested_incompatible_qos_status_t status, void* arg __attribute__((unused))) {
|
||||||
printf ("[requested-incompatible-qos: total=(%"PRIu32" change %"PRId32") last_policy=%s]\n",
|
printf ("[requested-incompatible-qos: total=(%"PRIu32" change %"PRId32") last_policy=%s]\n",
|
||||||
status.total_count, status.total_count_change, policystr(status.last_policy_id));
|
status.total_count, status.total_count_change, policystr(status.last_policy_id));
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void wr_on_offered_incompatible_qos(dds_entity_t wr __attribute__((unused)), const dds_offered_incompatible_qos_status_t status, void* arg __attribute__((unused))) {
|
static void wr_on_offered_incompatible_qos(dds_entity_t wr __attribute__((unused)), const dds_offered_incompatible_qos_status_t status, void* arg __attribute__((unused))) {
|
||||||
printf ("[offered-incompatible-qos: total=(%"PRIu32" change %"PRId32") last_policy=%s]\n",
|
printf ("[offered-incompatible-qos: total=(%"PRIu32" change %"PRId32") last_policy=%s]\n",
|
||||||
status.total_count, status.total_count_change, policystr(status.last_policy_id));
|
status.total_count, status.total_count_change, policystr(status.last_policy_id));
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void wr_on_liveliness_lost(dds_entity_t wr __attribute__((unused)), const dds_liveliness_lost_status_t status, void* arg __attribute__ ((unused))) {
|
static void wr_on_liveliness_lost(dds_entity_t wr __attribute__((unused)), const dds_liveliness_lost_status_t status, void* arg __attribute__ ((unused))) {
|
||||||
printf ("[liveliness-lost: total=(%"PRIu32" change %"PRId32")]\n",
|
printf ("[liveliness-lost: total=(%"PRIu32" change %"PRId32")]\n",
|
||||||
status.total_count, status.total_count_change);
|
status.total_count, status.total_count_change);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void wr_on_offered_deadline_missed(dds_entity_t wr __attribute__((unused)), const dds_offered_deadline_missed_status_t status, void* arg __attribute__((unused))) {
|
static void wr_on_offered_deadline_missed(dds_entity_t wr __attribute__((unused)), const dds_offered_deadline_missed_status_t status, void* arg __attribute__((unused))) {
|
||||||
printf ("[offered-deadline-missed: total=(%"PRIu32" change %"PRId32") handle=%"PRIu64"]\n",
|
printf ("[offered-deadline-missed: total=(%"PRIu32" change %"PRId32") handle=%"PRIu64"]\n",
|
||||||
status.total_count, status.total_count_change, status.last_instance_handle);
|
status.total_count, status.total_count_change, status.last_instance_handle);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void wr_on_publication_matched(dds_entity_t wr __attribute__((unused)), const dds_publication_matched_status_t status, void* arg __attribute__((unused))) {
|
static void wr_on_publication_matched(dds_entity_t wr __attribute__((unused)), const dds_publication_matched_status_t status, void* arg __attribute__((unused))) {
|
||||||
|
@ -1070,6 +1104,9 @@ static void wr_on_publication_matched(dds_entity_t wr __attribute__((unused)), c
|
||||||
status.total_count, status.total_count_change,
|
status.total_count, status.total_count_change,
|
||||||
status.current_count, status.current_count_change,
|
status.current_count, status.current_count_change,
|
||||||
status.last_subscription_handle);
|
status.last_subscription_handle);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int register_instance_wrapper(dds_entity_t wr, const void *d, const dds_time_t tstamp) {
|
static int register_instance_wrapper(dds_entity_t wr, const void *d, const dds_time_t tstamp) {
|
||||||
|
@ -1105,6 +1142,9 @@ static void non_data_operation(char command, dds_entity_t wr) {
|
||||||
switch (command) {
|
switch (command) {
|
||||||
case 'Y':
|
case 'Y':
|
||||||
printf ("Dispose all: not supported\n");
|
printf ("Dispose all: not supported\n");
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
// TODO Implement application side tracking of alive instances for use with a 'dispose all' function
|
// TODO Implement application side tracking of alive instances for use with a 'dispose all' function
|
||||||
// if ((result = DDS_Topic_dispose_all_data(DDS_DataWriter_get_topic(wr))) != DDS_RETCODE_OK)
|
// if ((result = DDS_Topic_dispose_all_data(DDS_DataWriter_get_topic(wr))) != DDS_RETCODE_OK)
|
||||||
// error ("DDS_Topic_dispose_all: error %d\n", (int) result);
|
// error ("DDS_Topic_dispose_all: error %d\n", (int) result);
|
||||||
|
@ -1207,6 +1247,9 @@ static void pub_do_auto(const struct writerspec *spec) {
|
||||||
while (!termflag && tprev < tstop) {
|
while (!termflag && tprev < tstop) {
|
||||||
if ((result = dds_write(spec->wr, &d)) != DDS_RETCODE_OK) {
|
if ((result = dds_write(spec->wr, &d)) != DDS_RETCODE_OK) {
|
||||||
printf ("write: error %d (%s)\n", (int) result, dds_err_str(result));
|
printf ("write: error %d (%s)\n", (int) result, dds_err_str(result));
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
if (result != DDS_RETCODE_TIMEOUT)
|
if (result != DDS_RETCODE_TIMEOUT)
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1232,6 +1275,9 @@ static void pub_do_auto(const struct writerspec *spec) {
|
||||||
while (!termflag && tprev < tstop) {
|
while (!termflag && tprev < tstop) {
|
||||||
if ((result = dds_write(spec->wr, &d)) != DDS_RETCODE_OK) {
|
if ((result = dds_write(spec->wr, &d)) != DDS_RETCODE_OK) {
|
||||||
printf ("write: error %d (%s)\n", (int) result, dds_err_str(result));
|
printf ("write: error %d (%s)\n", (int) result, dds_err_str(result));
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
if (result != DDS_RETCODE_TIMEOUT)
|
if (result != DDS_RETCODE_TIMEOUT)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1265,6 +1311,9 @@ static void pub_do_auto(const struct writerspec *spec) {
|
||||||
hist_print(hist, tlast - tfirst, 0);
|
hist_print(hist, tlast - tfirst, 0);
|
||||||
hist_free(hist);
|
hist_free(hist);
|
||||||
printf ("total writes: %" PRId64 " (%e/s)\n", ntot, (double)ntot * 1e9 / (double)(tlast - tfirst0));
|
printf ("total writes: %" PRId64 " (%e/s)\n", ntot, (double)ntot * 1e9 / (double)(tlast - tfirst0));
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
if (spec->topicsel == KS) {
|
if (spec->topicsel == KS) {
|
||||||
dds_free(d.ks.baggage._buffer);
|
dds_free(d.ks.baggage._buffer);
|
||||||
}
|
}
|
||||||
|
@ -1319,6 +1368,9 @@ static char *pub_do_nonarb(const struct writerspec *spec, uint32_t *seq) {
|
||||||
tstamp = (tstamp_spec.t % T_SECOND) + ((int) (tstamp_spec.t / T_SECOND) * DDS_NSECS_IN_SEC);
|
tstamp = (tstamp_spec.t % T_SECOND) + ((int) (tstamp_spec.t / T_SECOND) * DDS_NSECS_IN_SEC);
|
||||||
if ((result = fn(spec->wr, &d, tstamp)) != DDS_RETCODE_OK) {
|
if ((result = fn(spec->wr, &d, tstamp)) != DDS_RETCODE_OK) {
|
||||||
printf ("%s %d: error %d (%s)\n", get_write_operstr(command), k, (int) result, dds_err_str(result));
|
printf ("%s %d: error %d (%s)\n", get_write_operstr(command), k, (int) result, dds_err_str(result));
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
if (!accept_error(command, result))
|
if (!accept_error(command, result))
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
@ -1326,6 +1378,9 @@ static char *pub_do_nonarb(const struct writerspec *spec, uint32_t *seq) {
|
||||||
dds_write_flush(spec->wr);
|
dds_write_flush(spec->wr);
|
||||||
if (spec->dupwr && (result = fn(spec->dupwr, &d, tstamp)) != DDS_RETCODE_OK) {
|
if (spec->dupwr && (result = fn(spec->dupwr, &d, tstamp)) != DDS_RETCODE_OK) {
|
||||||
printf ("%s %d(dup): error %d (%s)\n", get_write_operstr(command), k, (int) result, dds_err_str(result));
|
printf ("%s %d(dup): error %d (%s)\n", get_write_operstr(command), k, (int) result, dds_err_str(result));
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
if (!accept_error(command, result))
|
if (!accept_error(command, result))
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
@ -1337,11 +1392,17 @@ static char *pub_do_nonarb(const struct writerspec *spec, uint32_t *seq) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 'z':
|
case 'z':
|
||||||
if (spec->topicsel != KS)
|
if (spec->topicsel != KS) {
|
||||||
printf ("payload size cannot be set for selected type\n");
|
printf ("payload size cannot be set for selected type\n");
|
||||||
else if (k < 12 && k != 0)
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
|
} else if (k < 12 && k != 0) {
|
||||||
printf ("invalid payload size: %d\n", k);
|
printf ("invalid payload size: %d\n", k);
|
||||||
else {
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
uint32_t baggagesize = (k != 0) ? (uint32_t) (k - 12) : 0;
|
uint32_t baggagesize = (k != 0) ? (uint32_t) (k - 12) : 0;
|
||||||
if (d.ks.baggage._buffer)
|
if (d.ks.baggage._buffer)
|
||||||
dds_free (d.ks.baggage._buffer);
|
dds_free (d.ks.baggage._buffer);
|
||||||
|
@ -1354,9 +1415,12 @@ static char *pub_do_nonarb(const struct writerspec *spec, uint32_t *seq) {
|
||||||
set_pub_partition(spec->pub, arg);
|
set_pub_partition(spec->pub, arg);
|
||||||
break;
|
break;
|
||||||
case 's':
|
case 's':
|
||||||
if (k < 0)
|
if (k < 0) {
|
||||||
printf ("invalid sleep duration: %ds\n", k);
|
printf ("invalid sleep duration: %ds\n", k);
|
||||||
else {
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
dds_sleepfor(DDS_SECS(k));
|
dds_sleepfor(DDS_SECS(k));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -1524,6 +1588,9 @@ static uint32_t pubthread(void *vwrspecs) {
|
||||||
cand = cursor;
|
cand = cursor;
|
||||||
else {
|
else {
|
||||||
printf ("%s: ambiguous writer specification\n", nextspec);
|
printf ("%s: ambiguous writer specification\n", nextspec);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1531,6 +1598,9 @@ static uint32_t pubthread(void *vwrspecs) {
|
||||||
} while (cursor != endm);
|
} while (cursor != endm);
|
||||||
if (cand == NULL) {
|
if (cand == NULL) {
|
||||||
printf ("%s: no matching writer specification\n", nextspec);
|
printf ("%s: no matching writer specification\n", nextspec);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
} else if (cursor != endm) { /* ambiguous case */
|
} else if (cursor != endm) { /* ambiguous case */
|
||||||
cursor = endm;
|
cursor = endm;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1699,6 +1769,9 @@ static uint32_t subthread(void *vspec) {
|
||||||
rc = dds_waitset_wait(ws, xs, nxs, DDS_INFINITY);
|
rc = dds_waitset_wait(ws, xs, nxs, DDS_INFINITY);
|
||||||
if (rc < DDS_RETCODE_OK) {
|
if (rc < DDS_RETCODE_OK) {
|
||||||
printf ("wait: error %d\n", (int) rc);
|
printf ("wait: error %d\n", (int) rc);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
} else if (rc == DDS_RETCODE_OK) {
|
} else if (rc == DDS_RETCODE_OK) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -1727,6 +1800,9 @@ static uint32_t subthread(void *vspec) {
|
||||||
status.current_count,
|
status.current_count,
|
||||||
status.current_count_change,
|
status.current_count_change,
|
||||||
status.last_publication_handle);
|
status.last_publication_handle);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1767,8 +1843,14 @@ static uint32_t subthread(void *vspec) {
|
||||||
; /* expected */
|
; /* expected */
|
||||||
} else if (spec->mode == MODE_CHECK || spec->mode == MODE_DUMP || spec->polling) {
|
} else if (spec->mode == MODE_CHECK || spec->mode == MODE_DUMP || spec->polling) {
|
||||||
printf ("%s: %d (%s) on %s\n", (!spec->use_take && spec->mode == MODE_DUMP) ? "read" : "take", (int) nread, dds_err_str(nread), spec->polling ? "poll" : "stcond");
|
printf ("%s: %d (%s) on %s\n", (!spec->use_take && spec->mode == MODE_DUMP) ? "read" : "take", (int) nread, dds_err_str(nread), spec->polling ? "poll" : "stcond");
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
printf ("%s: %d (%s) on rdcond%s\n", spec->use_take ? "take" : "read", (int) nread, dds_err_str(nread), (cond == rdcondA) ? "A" : (cond == rdcondD) ? "D" : "?");
|
printf ("%s: %d (%s) on rdcond%s\n", spec->use_take ? "take" : "read", (int) nread, dds_err_str(nread), (cond == rdcondA) ? "A" : (cond == rdcondD) ? "D" : "?");
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1825,6 +1907,9 @@ static uint32_t subthread(void *vspec) {
|
||||||
const double rate_Mbps = (double)(nreceived_bytes - last_nreceived_bytes) * 8 / 1e6;
|
const double rate_Mbps = (double)(nreceived_bytes - last_nreceived_bytes) * 8 / 1e6;
|
||||||
printf ("%"PRId64".%03"PRId64" ntot %lld nseq %lld ndelta %lld rate %.2f Mb/s\n",
|
printf ("%"PRId64".%03"PRId64" ntot %lld nseq %lld ndelta %lld rate %.2f Mb/s\n",
|
||||||
tdelta_s, tdelta_ms, nreceived, out_of_seq, ndelta, rate_Mbps);
|
tdelta_s, tdelta_ms, nreceived, out_of_seq, ndelta, rate_Mbps);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
last_nreceived = nreceived;
|
last_nreceived = nreceived;
|
||||||
last_nreceived_bytes = nreceived_bytes;
|
last_nreceived_bytes = nreceived_bytes;
|
||||||
tprint = tnow;
|
tprint = tnow;
|
||||||
|
@ -1854,10 +1939,14 @@ static uint32_t subthread(void *vspec) {
|
||||||
dds_return_t nread;
|
dds_return_t nread;
|
||||||
nread = dds_take_mask(rd, mseq, iseq, spec->read_maxsamples, spec->read_maxsamples, DDS_ANY_STATE);
|
nread = dds_take_mask(rd, mseq, iseq, spec->read_maxsamples, spec->read_maxsamples, DDS_ANY_STATE);
|
||||||
if (nread == 0) {
|
if (nread == 0) {
|
||||||
if (!once_mode)
|
if (!once_mode) {
|
||||||
printf ("-- final take: data reader empty --\n");
|
printf ("-- final take: data reader empty --\n");
|
||||||
else
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
exitcode = 1;
|
exitcode = 1;
|
||||||
|
}
|
||||||
} else if (nread < DDS_RETCODE_OK) {
|
} else if (nread < DDS_RETCODE_OK) {
|
||||||
if (!once_mode) {
|
if (!once_mode) {
|
||||||
error_report(rc, "-- final take --\n");
|
error_report(rc, "-- final take --\n");
|
||||||
|
@ -1888,8 +1977,12 @@ static uint32_t subthread(void *vspec) {
|
||||||
}
|
}
|
||||||
dds_free(iseq);
|
dds_free(iseq);
|
||||||
dds_free(mseq);
|
dds_free(mseq);
|
||||||
if (spec->mode == MODE_CHECK)
|
if (spec->mode == MODE_CHECK) {
|
||||||
printf ("received: %lld, out of seq: %lld\n", nreceived, out_of_seq);
|
printf ("received: %lld, out of seq: %lld\n", nreceived, out_of_seq);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
|
}
|
||||||
fini_eseq_admin(&eseq_admin);
|
fini_eseq_admin(&eseq_admin);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1950,6 +2043,9 @@ static uint32_t autotermthread(void *varg __attribute__((unused))) {
|
||||||
|
|
||||||
if ((rc = dds_waitset_wait(ws, wsresults, wsresultsize, timeout)) < DDS_RETCODE_OK) {
|
if ((rc = dds_waitset_wait(ws, wsresults, wsresultsize, timeout)) < DDS_RETCODE_OK) {
|
||||||
printf ("wait: error %s\n", dds_err_str(rc));
|
printf ("wait: error %s\n", dds_err_str(rc));
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
tnow = dds_time();
|
tnow = dds_time();
|
||||||
|
@ -2020,8 +2116,12 @@ static dds_entity_t find_topic(dds_entity_t dpFindTopic, const char *name, const
|
||||||
// }
|
// }
|
||||||
|
|
||||||
// TODO Note: the implementation for dds_topic_find blocks infinitely if the topic does not exist in the domain
|
// TODO Note: the implementation for dds_topic_find blocks infinitely if the topic does not exist in the domain
|
||||||
if (!(tp = dds_find_topic(dpFindTopic, name)))
|
if (!(tp = dds_find_topic(dpFindTopic, name))) {
|
||||||
printf ("topic %s not found\n", name);
|
printf ("topic %s not found\n", name);
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// if (!isbuiltin) {
|
// if (!isbuiltin) {
|
||||||
// char *tn = DDS_Topic_get_type_name(tp);
|
// char *tn = DDS_Topic_get_type_name(tp);
|
||||||
|
@ -2240,7 +2340,7 @@ int main(int argc, char *argv[]) {
|
||||||
wait_for_matching_reader_arg = optarg + pos;
|
wait_for_matching_reader_arg = optarg + pos;
|
||||||
break;
|
break;
|
||||||
case 'F':
|
case 'F':
|
||||||
setvbuf(stdout, (char *) NULL, _IOLBF, 0);
|
flushflag = 1;
|
||||||
break;
|
break;
|
||||||
case 'K':
|
case 'K':
|
||||||
addspec(SPEC_TOPICSEL, &spec_sofar, &specidx, &spec, want_reader);
|
addspec(SPEC_TOPICSEL, &spec_sofar, &specidx, &spec, want_reader);
|
||||||
|
@ -2629,6 +2729,9 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
if (want_writer && wait_for_matching_reader_arg) {
|
if (want_writer && wait_for_matching_reader_arg) {
|
||||||
printf("Wait for matching reader: unsupported\n");
|
printf("Wait for matching reader: unsupported\n");
|
||||||
|
if (flushflag) {
|
||||||
|
fflush (stdout);
|
||||||
|
}
|
||||||
// TODO Reimplement wait_for_matching_reader functionality via wait on status subscription matched
|
// TODO Reimplement wait_for_matching_reader functionality via wait on status subscription matched
|
||||||
// struct qos *q = NULL;
|
// struct qos *q = NULL;
|
||||||
// uint64_t tnow = dds_time();
|
// uint64_t tnow = dds_time();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue