diff --git a/src/tools/ddsperf/ddsperf.c b/src/tools/ddsperf/ddsperf.c index 06f1970..d270095 100644 --- a/src/tools/ddsperf/ddsperf.c +++ b/src/tools/ddsperf/ddsperf.c @@ -123,7 +123,7 @@ static int32_t histdepth = 0; /* Publishing rate in Hz, HUGE_VAL means as fast as possible, 0 means no throughput data is published at all */ -static double rate; +static double pub_rate; /* Fraction of throughput data samples that double as a ping message */ @@ -138,6 +138,10 @@ static dds_ignorelocal_kind_t ignorelocal = DDS_IGNORELOCAL_PARTICIPANT; possible, DDS_INFINITY means never */ static dds_duration_t ping_intv; +/* Number of times a new ping was sent before all expected + pongs had been received */ +static uint32_t ping_timeouts = 0; + static ddsrt_mutex_t disc_lock; /* Publisher statistics and lock protecting it */ @@ -582,12 +586,12 @@ static uint32_t pubthread (void *varg) data.seq_keyval.keyval = (data.seq_keyval.keyval + 1) % (int32_t) nkeyvals; data.seq++; - if (rate < HUGE_VAL) + if (pub_rate < HUGE_VAL) { if (++bi == burstsize) { /* FIXME: should average rate over a short-ish period, rather than over the entire run */ - while (((double) (ntot / burstsize) / ((double) (t - tfirst0) / 1e9 + 5e-3)) > rate && !termflag) + while (((double) (ntot / burstsize) / ((double) (t - tfirst0) / 1e9 + 5e-3)) > pub_rate && !termflag) { /* FIXME: flushing manually because batching is not yet implemented properly */ dds_write_flush (wr_data); @@ -742,7 +746,6 @@ static dds_entity_t get_pong_writer_locked (dds_instance_handle_t pubhandle) if (pongwr[i].pubhandle == 0) { pongwr[i].pubhandle = pubhandle; - ddsrt_mutex_unlock (&pongwr_lock); return wr_pong; } else @@ -752,7 +755,6 @@ static dds_entity_t get_pong_writer_locked (dds_instance_handle_t pubhandle) pongwr[npongwr].pphandle = pphandle; pongwr[npongwr].wr_pong = wr_pong; npongwr++; - ddsrt_mutex_unlock (&pongwr_lock); return wr_pong; } } @@ -885,19 +887,32 @@ static void maybe_send_new_ping (dds_time_t tnow, dds_time_t *tnextping) } else { - if (tnow > twarn_ping_timeout) + if (n_pong_seen < n_pong_expected) { - printf ("[%"PRIdPID"] ping timed out ... sending new ping\n", ddsrt_getpid ()); - fflush (stdout); + ping_timeouts++; + if (tnow > twarn_ping_timeout) + { + printf ("[%"PRIdPID"] ping timed out (total %"PRIu32" times) ... sending new ping\n", ddsrt_getpid (), ping_timeouts); + twarn_ping_timeout = tnow + DDS_SECS (1); + fflush (stdout); + } } n_pong_seen = 0; - cur_ping_time = tnow; - if (ping_intv > 0) + if (ping_intv == 0) + { + *tnextping = tnow + DDS_SECS (1); + cur_ping_time = tnow; + } + else + { + /* tnow should be ~ cur_ping_time + ping_intv, but it won't be if the + wakeup was delayed significantly, the machine was suspended in the + meantime, so slow down if we can't keep up */ + cur_ping_time += ping_intv; + if (cur_ping_time < tnow - ping_intv / 2) + cur_ping_time = tnow; *tnextping = cur_ping_time + ping_intv; - else if (ping_intv == 0) - *tnextping = cur_ping_time + DDS_SECS (1); - if (ping_intv > 0 && *tnextping > twarn_ping_timeout) - twarn_ping_timeout = *tnextping + ping_intv / 2; + } cur_ping_seq++; baggage = init_sample (&data, cur_ping_seq); ddsrt_mutex_unlock (&pongwr_lock); @@ -1053,7 +1068,7 @@ static void delete_pong_writer (dds_instance_handle_t pphandle) else { assert (wr_pong == 0 || wr_pong == pongwr[i].wr_pong); - memmove (&pongwr[i], &pongwr[i+1], (npongwr - i) * sizeof (pongwr[0])); + memmove (&pongwr[i], &pongwr[i+1], (npongwr - i - 1) * sizeof (pongwr[0])); npongwr--; } } @@ -1062,6 +1077,13 @@ static void delete_pong_writer (dds_instance_handle_t pphandle) dds_delete (wr_pong); } +static void free_ppant (void *vpp) +{ + struct ppant *pp = vpp; + free (pp->hostname); + free (pp); +} + static void participant_data_listener (dds_entity_t rd, void *arg) { dds_sample_info_t info; @@ -1091,7 +1113,7 @@ static void participant_data_listener (dds_entity_t rd, void *arg) ddsrt_avl_delete_dpath (&ppants_td, &ppants, pp, &dpath); if (pp->tdeadline != DDS_NEVER) ddsrt_fibheap_delete (&ppants_to_match_fhd, &ppants_to_match, pp); - free (pp); + free_ppant (pp); } ddsrt_mutex_unlock (&disc_lock); } @@ -1288,7 +1310,7 @@ static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev) const double ts = (double) (tnow - tstart) / 1e9; snprintf (prefix, sizeof (prefix), "[%"PRIdPID"] %.3f ", ddsrt_getpid (), ts); - if (rate > 0) + if (pub_rate > 0) { ddsrt_mutex_lock (&pubstat_lock); hist_print (prefix, pubstat_hist, tnow - tprev, 1); @@ -1519,15 +1541,15 @@ static void set_mode_ping (int *xoptind, int xargc, char * const xargv[]) while (*xoptind < xargc && exact_string_int_map_lookup (modestrings, "mode string", xargv[*xoptind], false) == -1) { int pos; - double r; + double ping_rate; if (strcmp (xargv[*xoptind], "inf") == 0) { ping_intv = 0; } - else if (sscanf (xargv[*xoptind], "%lf%n", &r, &pos) == 1 && (xargv[*xoptind][pos] == 0 || strcmp (xargv[*xoptind] + pos, "Hz") == 0)) + else if (sscanf (xargv[*xoptind], "%lf%n", &ping_rate, &pos) == 1 && (xargv[*xoptind][pos] == 0 || strcmp (xargv[*xoptind] + pos, "Hz") == 0)) { - if (r == 0) ping_intv = DDS_INFINITY; - else if (r > 0) ping_intv = (dds_duration_t) (1e9 / rate + 0.5); + if (ping_rate == 0) ping_intv = DDS_INFINITY; + else if (ping_rate > 0) ping_intv = (dds_duration_t) (1e9 / ping_rate + 0.5); else error3 ("%s: invalid ping rate\n", xargv[*xoptind]); } else @@ -1566,7 +1588,7 @@ static void set_mode_sub (int *xoptind, int xargc, char * const xargv[]) static void set_mode_pub (int *xoptind, int xargc, char * const xargv[]) { - rate = HUGE_VAL; + pub_rate = HUGE_VAL; burstsize = 1; ping_frac = 0; while (*xoptind < xargc && exact_string_int_map_lookup (modestrings, "mode string", xargv[*xoptind], false) == -1) @@ -1575,12 +1597,12 @@ static void set_mode_pub (int *xoptind, int xargc, char * const xargv[]) double r; if (strcmp (xargv[*xoptind], "inf") == 0 || strcmp (xargv[*xoptind], "infHz") == 0) { - rate = HUGE_VAL; + pub_rate = HUGE_VAL; } else if (sscanf (xargv[*xoptind], "%lf%n", &r, &pos) == 1 && (xargv[*xoptind][pos] == 0 || strcmp (xargv[*xoptind] + pos, "Hz") == 0)) { if (r < 0) error3 ("%s: invalid publish rate\n", xargv[*xoptind]); - rate = r; + pub_rate = r; } else if (strcmp (xargv[*xoptind], "burst") == 0) { @@ -1597,7 +1619,7 @@ static void set_mode_pub (int *xoptind, int xargc, char * const xargv[]) if (r < 0 || r > 100) error3 ("%s: ping fraction out of range\n", xargv[*xoptind]); ping_frac = (uint32_t) (UINT32_MAX * (r / 100.0) + 0.5); } - else if (strcmp (xargv[*xoptind], "ping") == 0 && *xoptind + 1 < xargc && sscanf (xargv[*xoptind + 1], "%lf%%%n", &rate, &pos) == 1 && xargv[*xoptind + 1][pos] == 0) + else if (strcmp (xargv[*xoptind], "ping") == 0 && *xoptind + 1 < xargc && sscanf (xargv[*xoptind + 1], "%lf%%%n", &pub_rate, &pos) == 1 && xargv[*xoptind + 1][pos] == 0) { ++(*xoptind); if (r < 0 || r > 100) error3 ("%s: ping fraction out of range\n", xargv[*xoptind]); @@ -1614,7 +1636,7 @@ static void set_mode_pub (int *xoptind, int xargc, char * const xargv[]) static void set_mode (int xoptind, int xargc, char * const xargv[]) { int code; - rate = 0.0; + pub_rate = 0.0; submode = SM_NONE; pingpongmode = SM_LISTENER; ping_intv = (xoptind == xargc) ? DDS_SECS (1) : DDS_INFINITY; @@ -1872,7 +1894,7 @@ int main (int argc, char *argv[]) memset (&subtid, 0, sizeof (subtid)); memset (&subpingtid, 0, sizeof (subpingtid)); memset (&subpongtid, 0, sizeof (subpongtid)); - if (rate > 0) + if (pub_rate > 0) ddsrt_thread_create (&pubtid, "pub", &attr, pubthread, NULL); if (subthread_func != 0) ddsrt_thread_create (&subtid, "sub", &attr, subthread_func, &subarg_data); @@ -2007,7 +2029,7 @@ int main (int argc, char *argv[]) } #endif - if (rate > 0) + if (pub_rate > 0) ddsrt_thread_join (pubtid, NULL); if (subthread_func != 0) ddsrt_thread_join (subtid, NULL); @@ -2054,6 +2076,10 @@ int main (int argc, char *argv[]) ddsrt_mutex_destroy (&pongstat_lock); ddsrt_mutex_destroy (&pubstat_lock); hist_free (pubstat_hist); + free (pongwr); + for (uint32_t i = 0; i < npongstat; i++) + free (pongstat[i].raw); + free (pongstat); bool ok = true; @@ -2068,6 +2094,9 @@ int main (int argc, char *argv[]) ok = false; } } + + ddsrt_avl_free (&ppants_td, &ppants, free_ppant); + if (matchcount < minmatch) { printf ("[%"PRIdPID"] error: too few matching participants (%"PRIu32" instead of %"PRIu32")\n", ddsrt_getpid (), matchcount, minmatch);