Optional reporting of late messages

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-09-02 15:44:46 +02:00 committed by eboasson
parent a63cc8b84d
commit 60a87ab4ab

View file

@ -61,6 +61,13 @@
#define MULTIDOMAIN 0 #define MULTIDOMAIN 0
#endif #endif
/* Set to > 0 for printing warnings to stderr for each messages that was taken more than this many
ms after writing */
#define REPORT_LATE_MESSAGES 0
/* Set to != 0 for periodically printing requests that have been blocked for more than 1s */
#define REPORT_BLOCKED_REQUESTS 0
#define RET_ERR_X(msg, code) do {RMW_SET_ERROR_MSG(msg); code;} while (0) #define RET_ERR_X(msg, code) do {RMW_SET_ERROR_MSG(msg); code;} while (0)
#define RET_NULL_X(var, code) do {if (!var) RET_ERR_X (#var " is null", code);} while (0) #define RET_NULL_X(var, code) do {if (!var) RET_ERR_X (#var " is null", code);} while (0)
#define RET_ALLOC_X(var, code) do {if (!var) RET_ERR_X ("failed to allocate " #var, code); \ #define RET_ALLOC_X(var, code) do {if (!var) RET_ERR_X ("failed to allocate " #var, code); \
@ -147,6 +154,12 @@ struct CddsCS
struct CddsClient struct CddsClient
{ {
CddsCS client; CddsCS client;
#if REPORT_BLOCKED_REQUESTS
std::mutex lock;
dds_time_t lastcheck;
std::map<int64_t, dds_time_t> reqtime;
#endif
}; };
struct CddsService struct CddsService
@ -214,6 +227,9 @@ struct Cdds
static Cdds gcdds; static Cdds gcdds;
static void clean_waitset_caches(); static void clean_waitset_caches();
#if REPORT_BLOCKED_REQUESTS
static void check_for_blocked_requests(CddsClient & client);
#endif
#ifndef WIN32 #ifndef WIN32
/* TODO(allenh1): check for Clang */ /* TODO(allenh1): check for Clang */
@ -820,7 +836,7 @@ static dds_qos_t * create_readwrite_qos(
dds_delete_qos(qos); dds_delete_qos(qos);
return nullptr; return nullptr;
} }
dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, static_cast<uint32_t>(qos_policies->depth)); dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, static_cast<int32_t>(qos_policies->depth));
} }
break; break;
case RMW_QOS_POLICY_HISTORY_KEEP_ALL: case RMW_QOS_POLICY_HISTORY_KEEP_ALL:
@ -843,10 +859,19 @@ static dds_qos_t * create_readwrite_qos(
case RMW_QOS_POLICY_DURABILITY_VOLATILE: case RMW_QOS_POLICY_DURABILITY_VOLATILE:
dds_qset_durability(qos, DDS_DURABILITY_VOLATILE); dds_qset_durability(qos, DDS_DURABILITY_VOLATILE);
break; break;
case RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL: case RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL: {
/* Cyclone uses durability service QoS for determining what to retain as historical data,
separating the reliability window from the historical data; but that is somewhat unusual
among DDS implementations ... */
dds_history_kind_t hk;
int32_t hd;
dds_qget_history(qos, &hk, &hd);
dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL); dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL);
dds_qset_durability_service(qos, DDS_SECS(0), hk, hd, DDS_LENGTH_UNLIMITED,
DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED);
break; break;
} }
}
/* deadline, lifespan, liveliness are not yet supported */ /* deadline, lifespan, liveliness are not yet supported */
if (ignore_local_publications) { if (ignore_local_publications) {
dds_qset_ignorelocal(qos, DDS_IGNORELOCAL_PARTICIPANT); dds_qset_ignorelocal(qos, DDS_IGNORELOCAL_PARTICIPANT);
@ -1345,6 +1370,14 @@ static rmw_ret_t rmw_take_int(
memcpy(message_info->publisher_gid.data, &info.publication_handle, memcpy(message_info->publisher_gid.data, &info.publication_handle,
sizeof(info.publication_handle)); sizeof(info.publication_handle));
} }
#if REPORT_LATE_MESSAGES > 0
dds_time_t tnow = dds_time();
dds_time_t dt = tnow - info.source_timestamp;
if (dt >= DDS_MSECS(REPORT_LATE_MESSAGES)) {
fprintf(stderr, "** %s sample in history for %.fms\n", sub->sertopic->name,
static_cast<double>(dt) / 1e6);
}
#endif
*taken = true; *taken = true;
return RMW_RET_OK; return RMW_RET_OK;
} }
@ -1767,8 +1800,9 @@ extern "C" rmw_ret_t rmw_wait(
(wait_timeout == NULL) ? (wait_timeout == NULL) ?
DDS_NEVER : DDS_NEVER :
(dds_duration_t) wait_timeout->sec * 1000000000 + wait_timeout->nsec; (dds_duration_t) wait_timeout->sec * 1000000000 + wait_timeout->nsec;
ws->trigs.resize(ws->nelems + 1);
const dds_return_t ntrig = dds_waitset_wait(ws->waitseth, ws->trigs.data(), const dds_return_t ntrig = dds_waitset_wait(ws->waitseth, ws->trigs.data(),
ws->trigs.capacity(), timeout); ws->trigs.size(), timeout);
ws->trigs.resize(ntrig); ws->trigs.resize(ntrig);
std::sort(ws->trigs.begin(), ws->trigs.end()); std::sort(ws->trigs.begin(), ws->trigs.end());
ws->trigs.push_back((dds_attach_t) -1); ws->trigs.push_back((dds_attach_t) -1);
@ -1800,6 +1834,12 @@ extern "C" rmw_ret_t rmw_wait(
#undef DETACH #undef DETACH
} }
#if REPORT_BLOCKED_REQUESTS
for (auto const & c : ws->cls) {
check_for_blocked_requests(*c);
}
#endif
{ {
std::lock_guard<std::mutex> lock(ws->lock); std::lock_guard<std::mutex> lock(ws->lock);
ws->inuse = false; ws->inuse = false;
@ -1816,7 +1856,7 @@ extern "C" rmw_ret_t rmw_wait(
static rmw_ret_t rmw_take_response_request( static rmw_ret_t rmw_take_response_request(
CddsCS * cs, rmw_request_id_t * request_header, CddsCS * cs, rmw_request_id_t * request_header,
void * ros_data, bool * taken, void * ros_data, bool * taken, dds_time_t * source_timestamp,
dds_instance_handle_t srcfilter) dds_instance_handle_t srcfilter)
{ {
RET_NULL(taken); RET_NULL(taken);
@ -1832,6 +1872,9 @@ static rmw_ret_t rmw_take_response_request(
assert(sizeof(wrap.header.guid) <= sizeof(request_header->writer_guid)); assert(sizeof(wrap.header.guid) <= sizeof(request_header->writer_guid));
memcpy(request_header->writer_guid, &wrap.header.guid, sizeof(wrap.header.guid)); memcpy(request_header->writer_guid, &wrap.header.guid, sizeof(wrap.header.guid));
request_header->sequence_number = wrap.header.seq; request_header->sequence_number = wrap.header.seq;
if (source_timestamp) {
*source_timestamp = info.source_timestamp;
}
if (srcfilter == 0 || srcfilter == wrap.header.guid) { if (srcfilter == 0 || srcfilter == wrap.header.guid) {
*taken = true; *taken = true;
return RMW_RET_OK; return RMW_RET_OK;
@ -1849,10 +1892,46 @@ extern "C" rmw_ret_t rmw_take_response(
{ {
RET_WRONG_IMPLID(client); RET_WRONG_IMPLID(client);
auto info = static_cast<CddsClient *>(client->data); auto info = static_cast<CddsClient *>(client->data);
return rmw_take_response_request(&info->client, request_header, ros_response, taken, dds_time_t source_timestamp;
info->client.pub->pubiid); rmw_ret_t ret = rmw_take_response_request(&info->client, request_header, ros_response, taken,
&source_timestamp, info->client.pub->pubiid);
#if REPORT_BLOCKED_REQUESTS
if (ret == RMW_RET_OK && *taken) {
std::lock_guard<std::mutex> lock(info->lock);
uint64_t seq = request_header->sequence_number;
dds_time_t tnow = dds_time();
dds_time_t dtresp = tnow - source_timestamp;
dds_time_t dtreq = tnow - info->reqtime[seq];
if (dtreq > DDS_MSECS(REPORT_LATE_MESSAGES) || dtresp > DDS_MSECS(REPORT_LATE_MESSAGES)) {
fprintf(stderr, "** %s response time %.fms; response in history for %.fms\n",
info->client.sub->sertopic->name, static_cast<double>(dtreq) / 1e6,
static_cast<double>(dtresp) / 1e6);
}
info->reqtime.erase(seq);
}
#endif
return ret;
} }
#if REPORT_BLOCKED_REQUESTS
static void check_for_blocked_requests(CddsClient & client)
{
dds_time_t tnow = dds_time();
std::lock_guard<std::mutex> lock(client.lock);
if (tnow > client.lastcheck + DDS_SECS(1)) {
client.lastcheck = tnow;
for (auto const & r : client.reqtime) {
dds_time_t dt = tnow - r.second;
if (dt > DDS_SECS(1)) {
fprintf(stderr, "** %s already waiting for %.fms\n", client.client.sub->sertopic->name,
static_cast<double>(dt) / 1e6);
}
}
}
}
#endif
extern "C" rmw_ret_t rmw_take_request( extern "C" rmw_ret_t rmw_take_request(
const rmw_service_t * service, const rmw_service_t * service,
rmw_request_id_t * request_header, void * ros_request, rmw_request_id_t * request_header, void * ros_request,
@ -1860,7 +1939,7 @@ extern "C" rmw_ret_t rmw_take_request(
{ {
RET_WRONG_IMPLID(service); RET_WRONG_IMPLID(service);
auto info = static_cast<CddsService *>(service->data); auto info = static_cast<CddsService *>(service->data);
return rmw_take_response_request(&info->service, request_header, ros_request, taken, 0); return rmw_take_response_request(&info->service, request_header, ros_request, taken, nullptr, 0);
} }
static rmw_ret_t rmw_send_response_request( static rmw_ret_t rmw_send_response_request(
@ -1902,6 +1981,14 @@ extern "C" rmw_ret_t rmw_send_request(
cdds_request_header_t header; cdds_request_header_t header;
header.guid = info->client.pub->pubiid; header.guid = info->client.pub->pubiid;
header.seq = *sequence_id = ++next_request_id; header.seq = *sequence_id = ++next_request_id;
#if REPORT_BLOCKED_REQUESTS
{
std::lock_guard<std::mutex> lock(info->lock);
info->reqtime[header.seq] = dds_time();
}
#endif
return rmw_send_response_request(&info->client, header, ros_request); return rmw_send_response_request(&info->client, header, ros_request);
} }
@ -2052,6 +2139,9 @@ extern "C" rmw_client_t * rmw_create_client(
const rmw_qos_profile_t * qos_policies) const rmw_qos_profile_t * qos_policies)
{ {
CddsClient * info = new CddsClient(); CddsClient * info = new CddsClient();
#if REPORT_BLOCKED_REQUESTS
info->lastcheck = 0;
#endif
if (rmw_init_cs(&info->client, node, type_supports, service_name, qos_policies, if (rmw_init_cs(&info->client, node, type_supports, service_name, qos_policies,
false) != RMW_RET_OK) false) != RMW_RET_OK)
{ {