diff --git a/src/tools/pubsub/pubsub.c b/src/tools/pubsub/pubsub.c index 6067fa0..5a9f0d8 100755 --- a/src/tools/pubsub/pubsub.c +++ b/src/tools/pubsub/pubsub.c @@ -1222,6 +1222,8 @@ static void pub_do_auto(const struct writerspec *spec) { } if (++bi == spec->burstsize) { while (((ntot / spec->burstsize) / ((t - tfirst0) / 1e9 + 5e-3)) > spec->writerate && !termflag) { + /* FIXME: only doing this manually because batching is not yet implemented properly */ + dds_write_flush(spec->wr); dds_sleepfor(DDS_MSECS(10)); t = dds_time(); } @@ -1292,11 +1294,17 @@ static char *pub_do_nonarb(const struct writerspec *spec, uint32_t *seq) { if (!accept_error(command, result)) exit(1); } + /* FIXME: only doing this manually because batching is not yet implemented properly */ + dds_write_flush(spec->wr); if (spec->dupwr && (result = fn(spec->dupwr, &d, tstamp)) != DDS_RETCODE_OK) { printf ("%s %d(dup): error %d (%s)\n", get_write_operstr(command), k, (int) result, dds_err_str(result)); if (!accept_error(command, result)) exit(1); } + if (spec->dupwr) { + /* FIXME: only doing this manually because batching is not yet implemented properly */ + dds_write_flush(spec->wr); + } d.seq++; break; } @@ -2492,6 +2500,7 @@ int MAIN(int argc, char *argv[]) { common_init(argv[0]); set_systemid_env(); + dds_write_set_batch(true); // FIXME: hack (the global batching flag is a hack anyway) { char **ps = (char **) dds_alloc(sizeof(char *) * (argc - os_get_optind()));