Fix detecting remote ping/pong writers in ddsperf

The status mask on some readers got reduced to just "data available"
when used in conjunction with a waitset, but the consequence is that the
"subscription matched" listener would be suppressed.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-02-11 11:59:26 +01:00 committed by eboasson
parent ab7c95e02f
commit b84eee5abb

View file

@ -969,18 +969,24 @@ static void maybe_send_new_ping (dds_time_t tnow, dds_time_t *tnextping)
} }
} }
static uint32_t subthread_waitset (void *varg) static dds_entity_t make_reader_waitset (dds_entity_t rd)
{ {
struct subthread_arg * const arg = varg;
dds_entity_t ws; dds_entity_t ws;
int32_t rc; int32_t rc;
ws = dds_create_waitset (dp); ws = dds_create_waitset (dp);
if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0) if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0)
error2 ("dds_waitset_attach (termcond, 0): %d\n", (int) rc); error2 ("dds_waitset_attach (termcond, 0): %d\n", (int) rc);
if ((rc = dds_set_status_mask (rd_data, DDS_DATA_AVAILABLE_STATUS)) < 0) if ((rc = dds_set_status_mask (rd, DDS_DATA_AVAILABLE_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS)) < 0)
error2 ("dds_set_status_mask (rd_data, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc); error2 ("dds_set_status_mask (rd, DDS_DATA_AVAILABLE_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS): %d\n", (int) rc);
if ((rc = dds_waitset_attach (ws, rd_data, 1)) < 0) if ((rc = dds_waitset_attach (ws, rd, 1)) < 0)
error2 ("dds_waitset_attach (ws, rd_data, 1): %d\n", (int) rc); error2 ("dds_waitset_attach (ws, rd, 1): %d\n", (int) rc);
return ws;
}
static uint32_t subthread_waitset (void *varg)
{
struct subthread_arg * const arg = varg;
dds_entity_t ws = make_reader_waitset (rd_data);
while (!ddsrt_atomic_ld32 (&termflag)) while (!ddsrt_atomic_ld32 (&termflag))
{ {
if (!process_data (rd_data, arg)) if (!process_data (rd_data, arg))
@ -998,15 +1004,7 @@ static uint32_t subthread_waitset (void *varg)
static uint32_t subpingthread_waitset (void *varg) static uint32_t subpingthread_waitset (void *varg)
{ {
struct subthread_arg * const arg = varg; struct subthread_arg * const arg = varg;
dds_entity_t ws; dds_entity_t ws = make_reader_waitset (rd_ping);
int32_t rc;
ws = dds_create_waitset (dp);
if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0)
error2 ("dds_waitset_attach (termcond, 0): %d\n", (int) rc);
if ((rc = dds_set_status_mask (rd_ping, DDS_DATA_AVAILABLE_STATUS)) < 0)
error2 ("dds_set_status_mask (rd_ping, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc);
if ((rc = dds_waitset_attach (ws, rd_ping, 1)) < 0)
error2 ("dds_waitset_attach (ws, rd_ping, 1): %d\n", (int) rc);
while (!ddsrt_atomic_ld32 (&termflag)) while (!ddsrt_atomic_ld32 (&termflag))
{ {
int32_t nxs; int32_t nxs;
@ -1020,15 +1018,7 @@ static uint32_t subpingthread_waitset (void *varg)
static uint32_t subpongthread_waitset (void *varg) static uint32_t subpongthread_waitset (void *varg)
{ {
struct subthread_arg * const arg = varg; struct subthread_arg * const arg = varg;
dds_entity_t ws; dds_entity_t ws = make_reader_waitset (rd_pong);
int32_t rc;
ws = dds_create_waitset (dp);
if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0)
error2 ("dds_waitset_attach (termcond, 0): %d\n", (int) rc);
if ((rc = dds_set_status_mask (rd_pong, DDS_DATA_AVAILABLE_STATUS)) < 0)
error2 ("dds_set_status_mask (rd_pong, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc);
if ((rc = dds_waitset_attach (ws, rd_pong, 1)) < 0)
error2 ("dds_waitset_attach (ws, rd_pong, 1): %d\n", (int) rc);
while (!ddsrt_atomic_ld32 (&termflag)) while (!ddsrt_atomic_ld32 (&termflag))
{ {
int32_t nxs; int32_t nxs;
@ -1101,7 +1091,7 @@ static dds_entity_t create_pong_writer (dds_instance_handle_t pphandle, const st
return wr_pong; return wr_pong;
} }
static void delete_pong_writer (dds_instance_handle_t pphandle) static dds_entity_t delete_pong_writer (dds_instance_handle_t pphandle)
{ {
uint32_t i = 0; uint32_t i = 0;
dds_entity_t wr_pong = 0; dds_entity_t wr_pong = 0;
@ -1118,8 +1108,7 @@ static void delete_pong_writer (dds_instance_handle_t pphandle)
} }
} }
ddsrt_mutex_unlock (&pongwr_lock); ddsrt_mutex_unlock (&pongwr_lock);
if (wr_pong) return wr_pong;
dds_delete (wr_pong);
} }
static void free_ppant (void *vpp) static void free_ppant (void *vpp)
@ -1143,6 +1132,7 @@ static void participant_data_listener (dds_entity_t rd, void *arg)
if (info.instance_state != DDS_ALIVE_INSTANCE_STATE) if (info.instance_state != DDS_ALIVE_INSTANCE_STATE)
{ {
ddsrt_avl_dpath_t dpath; ddsrt_avl_dpath_t dpath;
dds_entity_t pong_wr_to_del = 0;
ddsrt_mutex_lock (&disc_lock); ddsrt_mutex_lock (&disc_lock);
if ((pp = ddsrt_avl_lookup_dpath (&ppants_td, &ppants, &info.instance_handle, &dpath)) != NULL) if ((pp = ddsrt_avl_lookup_dpath (&ppants_td, &ppants, &info.instance_handle, &dpath)) != NULL)
{ {
@ -1151,7 +1141,7 @@ static void participant_data_listener (dds_entity_t rd, void *arg)
if (pp->handle != dp_handle || ignorelocal == DDS_IGNORELOCAL_NONE) if (pp->handle != dp_handle || ignorelocal == DDS_IGNORELOCAL_NONE)
{ {
delete_pong_writer (pp->handle); pong_wr_to_del = delete_pong_writer (pp->handle);
n_pong_expected_delta--; n_pong_expected_delta--;
} }
@ -1161,6 +1151,7 @@ static void participant_data_listener (dds_entity_t rd, void *arg)
free_ppant (pp); free_ppant (pp);
} }
ddsrt_mutex_unlock (&disc_lock); ddsrt_mutex_unlock (&disc_lock);
dds_delete (pong_wr_to_del);
} }
else else
{ {
@ -1171,6 +1162,7 @@ static void participant_data_listener (dds_entity_t rd, void *arg)
/* only add unknown participants with the magic user_data value: DDSPerf:X:HOSTNAME, where X is decimal */ /* only add unknown participants with the magic user_data value: DDSPerf:X:HOSTNAME, where X is decimal */
if (dds_qget_userdata (sample->qos, &vudata, &usz) && usz > 0) if (dds_qget_userdata (sample->qos, &vudata, &usz) && usz > 0)
{ {
bool make_pongwr = false;
const char *udata = vudata; const char *udata = vudata;
int has_reader, pos; int has_reader, pos;
long pid; long pid;
@ -1199,7 +1191,11 @@ static void participant_data_listener (dds_entity_t rd, void *arg)
ddsrt_fibheap_insert (&ppants_to_match_fhd, &ppants_to_match, pp); ddsrt_fibheap_insert (&ppants_to_match_fhd, &ppants_to_match, pp);
ddsrt_avl_insert_ipath (&ppants_td, &ppants, pp, &ipath); ddsrt_avl_insert_ipath (&ppants_td, &ppants, pp, &ipath);
if (pp->handle != dp_handle || ignorelocal == DDS_IGNORELOCAL_NONE) make_pongwr = (pp->handle != dp_handle || ignorelocal == DDS_IGNORELOCAL_NONE);
}
ddsrt_mutex_unlock (&disc_lock);
if (make_pongwr)
{ {
struct guidstr guidstr; struct guidstr guidstr;
make_guidstr (&guidstr, &sample->key); make_guidstr (&guidstr, &sample->key);
@ -1207,8 +1203,6 @@ static void participant_data_listener (dds_entity_t rd, void *arg)
n_pong_expected_delta++; n_pong_expected_delta++;
} }
} }
ddsrt_mutex_unlock (&disc_lock);
}
dds_free (vudata); dds_free (vudata);
} }
} }