diff --git a/src/tools/ddsperf/ddsperf.c b/src/tools/ddsperf/ddsperf.c index 8fe6eae..628778b 100644 --- a/src/tools/ddsperf/ddsperf.c +++ b/src/tools/ddsperf/ddsperf.c @@ -969,18 +969,24 @@ static void maybe_send_new_ping (dds_time_t tnow, dds_time_t *tnextping) } } -static uint32_t subthread_waitset (void *varg) +static dds_entity_t make_reader_waitset (dds_entity_t rd) { - struct subthread_arg * const arg = varg; dds_entity_t ws; int32_t rc; ws = dds_create_waitset (dp); if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0) error2 ("dds_waitset_attach (termcond, 0): %d\n", (int) rc); - if ((rc = dds_set_status_mask (rd_data, DDS_DATA_AVAILABLE_STATUS)) < 0) - error2 ("dds_set_status_mask (rd_data, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc); - if ((rc = dds_waitset_attach (ws, rd_data, 1)) < 0) - error2 ("dds_waitset_attach (ws, rd_data, 1): %d\n", (int) rc); + if ((rc = dds_set_status_mask (rd, DDS_DATA_AVAILABLE_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS)) < 0) + error2 ("dds_set_status_mask (rd, DDS_DATA_AVAILABLE_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS): %d\n", (int) rc); + if ((rc = dds_waitset_attach (ws, rd, 1)) < 0) + error2 ("dds_waitset_attach (ws, rd, 1): %d\n", (int) rc); + return ws; +} + +static uint32_t subthread_waitset (void *varg) +{ + struct subthread_arg * const arg = varg; + dds_entity_t ws = make_reader_waitset (rd_data); while (!ddsrt_atomic_ld32 (&termflag)) { if (!process_data (rd_data, arg)) @@ -998,15 +1004,7 @@ static uint32_t subthread_waitset (void *varg) static uint32_t subpingthread_waitset (void *varg) { struct subthread_arg * const arg = varg; - dds_entity_t ws; - int32_t rc; - ws = dds_create_waitset (dp); - if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0) - error2 ("dds_waitset_attach (termcond, 0): %d\n", (int) rc); - if ((rc = dds_set_status_mask (rd_ping, DDS_DATA_AVAILABLE_STATUS)) < 0) - error2 ("dds_set_status_mask (rd_ping, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc); - if ((rc = dds_waitset_attach (ws, rd_ping, 1)) < 0) - error2 ("dds_waitset_attach (ws, rd_ping, 1): %d\n", (int) rc); + dds_entity_t ws = make_reader_waitset (rd_ping); while (!ddsrt_atomic_ld32 (&termflag)) { int32_t nxs; @@ -1020,15 +1018,7 @@ static uint32_t subpingthread_waitset (void *varg) static uint32_t subpongthread_waitset (void *varg) { struct subthread_arg * const arg = varg; - dds_entity_t ws; - int32_t rc; - ws = dds_create_waitset (dp); - if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0) - error2 ("dds_waitset_attach (termcond, 0): %d\n", (int) rc); - if ((rc = dds_set_status_mask (rd_pong, DDS_DATA_AVAILABLE_STATUS)) < 0) - error2 ("dds_set_status_mask (rd_pong, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc); - if ((rc = dds_waitset_attach (ws, rd_pong, 1)) < 0) - error2 ("dds_waitset_attach (ws, rd_pong, 1): %d\n", (int) rc); + dds_entity_t ws = make_reader_waitset (rd_pong); while (!ddsrt_atomic_ld32 (&termflag)) { int32_t nxs; @@ -1101,7 +1091,7 @@ static dds_entity_t create_pong_writer (dds_instance_handle_t pphandle, const st return wr_pong; } -static void delete_pong_writer (dds_instance_handle_t pphandle) +static dds_entity_t delete_pong_writer (dds_instance_handle_t pphandle) { uint32_t i = 0; dds_entity_t wr_pong = 0; @@ -1118,8 +1108,7 @@ static void delete_pong_writer (dds_instance_handle_t pphandle) } } ddsrt_mutex_unlock (&pongwr_lock); - if (wr_pong) - dds_delete (wr_pong); + return wr_pong; } static void free_ppant (void *vpp) @@ -1143,6 +1132,7 @@ static void participant_data_listener (dds_entity_t rd, void *arg) if (info.instance_state != DDS_ALIVE_INSTANCE_STATE) { ddsrt_avl_dpath_t dpath; + dds_entity_t pong_wr_to_del = 0; ddsrt_mutex_lock (&disc_lock); if ((pp = ddsrt_avl_lookup_dpath (&ppants_td, &ppants, &info.instance_handle, &dpath)) != NULL) { @@ -1151,7 +1141,7 @@ static void participant_data_listener (dds_entity_t rd, void *arg) if (pp->handle != dp_handle || ignorelocal == DDS_IGNORELOCAL_NONE) { - delete_pong_writer (pp->handle); + pong_wr_to_del = delete_pong_writer (pp->handle); n_pong_expected_delta--; } @@ -1161,6 +1151,7 @@ static void participant_data_listener (dds_entity_t rd, void *arg) free_ppant (pp); } ddsrt_mutex_unlock (&disc_lock); + dds_delete (pong_wr_to_del); } else { @@ -1171,6 +1162,7 @@ static void participant_data_listener (dds_entity_t rd, void *arg) /* only add unknown participants with the magic user_data value: DDSPerf:X:HOSTNAME, where X is decimal */ if (dds_qget_userdata (sample->qos, &vudata, &usz) && usz > 0) { + bool make_pongwr = false; const char *udata = vudata; int has_reader, pos; long pid; @@ -1199,15 +1191,17 @@ static void participant_data_listener (dds_entity_t rd, void *arg) ddsrt_fibheap_insert (&ppants_to_match_fhd, &ppants_to_match, pp); ddsrt_avl_insert_ipath (&ppants_td, &ppants, pp, &ipath); - if (pp->handle != dp_handle || ignorelocal == DDS_IGNORELOCAL_NONE) - { - struct guidstr guidstr; - make_guidstr (&guidstr, &sample->key); - create_pong_writer (pp->handle, &guidstr); - n_pong_expected_delta++; - } + make_pongwr = (pp->handle != dp_handle || ignorelocal == DDS_IGNORELOCAL_NONE); } ddsrt_mutex_unlock (&disc_lock); + + if (make_pongwr) + { + struct guidstr guidstr; + make_guidstr (&guidstr, &sample->key); + create_pong_writer (pp->handle, &guidstr); + n_pong_expected_delta++; + } } dds_free (vudata); } @@ -1312,8 +1306,8 @@ static void subscription_matched_listener (dds_entity_t rd, const dds_subscripti static void publication_matched_listener (dds_entity_t wr, const dds_publication_matched_status_t status, void *arg) { /* this only works because the listener is called for every match; but I don't think that is something the - spec guarantees, and I don't think Cyclone should guarantee that either -- and if it isn't guaranteed - _really_ needs the get_matched_... interfaces to not have to implement the matching logic ... */ + spec guarantees, and I don't think Cyclone should guarantee that either -- and if it isn't guaranteed + _really_ needs the get_matched_... interfaces to not have to implement the matching logic ... */ (void) wr; if (status.current_count_change > 0) { @@ -1326,8 +1320,8 @@ static void publication_matched_listener (dds_entity_t wr, const dds_publication static void set_data_available_listener (dds_entity_t rd, const char *rd_name, dds_on_data_available_fn fn, void *arg) { /* This convoluted code is so that we leave all listeners unchanged, except the - data_available one. There is no real need for these complications, but it is - a nice exercise. */ + data_available one. There is no real need for these complications, but it is + a nice exercise. */ dds_listener_t *listener = dds_create_listener (arg); dds_return_t rc; dds_lset_data_available (listener, fn);