diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index d451770..0f7bbcf 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -61,6 +61,13 @@ #define MULTIDOMAIN 0 #endif +/* Set to > 0 for printing warnings to stderr for each messages that was taken more than this many + ms after writing */ +#define REPORT_LATE_MESSAGES 0 + +/* Set to != 0 for periodically printing requests that have been blocked for more than 1s */ +#define REPORT_BLOCKED_REQUESTS 0 + #define RET_ERR_X(msg, code) do {RMW_SET_ERROR_MSG(msg); code;} while (0) #define RET_NULL_X(var, code) do {if (!var) RET_ERR_X (#var " is null", code);} while (0) #define RET_ALLOC_X(var, code) do {if (!var) RET_ERR_X ("failed to allocate " #var, code); \ @@ -147,6 +154,12 @@ struct CddsCS struct CddsClient { CddsCS client; + +#if REPORT_BLOCKED_REQUESTS + std::mutex lock; + dds_time_t lastcheck; + std::map reqtime; +#endif }; struct CddsService @@ -214,6 +227,9 @@ struct Cdds static Cdds gcdds; static void clean_waitset_caches(); +#if REPORT_BLOCKED_REQUESTS +static void check_for_blocked_requests(CddsClient & client); +#endif #ifndef WIN32 /* TODO(allenh1): check for Clang */ @@ -820,7 +836,7 @@ static dds_qos_t * create_readwrite_qos( dds_delete_qos(qos); return nullptr; } - dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, static_cast(qos_policies->depth)); + dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, static_cast(qos_policies->depth)); } break; case RMW_QOS_POLICY_HISTORY_KEEP_ALL: @@ -843,9 +859,18 @@ static dds_qos_t * create_readwrite_qos( case RMW_QOS_POLICY_DURABILITY_VOLATILE: dds_qset_durability(qos, DDS_DURABILITY_VOLATILE); break; - case RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL: - dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL); - break; + case RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL: { + /* Cyclone uses durability service QoS for determining what to retain as historical data, + separating the reliability window from the historical data; but that is somewhat unusual + among DDS implementations ... */ + dds_history_kind_t hk; + int32_t hd; + dds_qget_history(qos, &hk, &hd); + dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL); + dds_qset_durability_service(qos, DDS_SECS(0), hk, hd, DDS_LENGTH_UNLIMITED, + DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); + break; + } } /* deadline, lifespan, liveliness are not yet supported */ if (ignore_local_publications) { @@ -1345,6 +1370,14 @@ static rmw_ret_t rmw_take_int( memcpy(message_info->publisher_gid.data, &info.publication_handle, sizeof(info.publication_handle)); } +#if REPORT_LATE_MESSAGES > 0 + dds_time_t tnow = dds_time(); + dds_time_t dt = tnow - info.source_timestamp; + if (dt >= DDS_MSECS(REPORT_LATE_MESSAGES)) { + fprintf(stderr, "** %s sample in history for %.fms\n", sub->sertopic->name, + static_cast(dt) / 1e6); + } +#endif *taken = true; return RMW_RET_OK; } @@ -1767,8 +1800,9 @@ extern "C" rmw_ret_t rmw_wait( (wait_timeout == NULL) ? DDS_NEVER : (dds_duration_t) wait_timeout->sec * 1000000000 + wait_timeout->nsec; + ws->trigs.resize(ws->nelems + 1); const dds_return_t ntrig = dds_waitset_wait(ws->waitseth, ws->trigs.data(), - ws->trigs.capacity(), timeout); + ws->trigs.size(), timeout); ws->trigs.resize(ntrig); std::sort(ws->trigs.begin(), ws->trigs.end()); ws->trigs.push_back((dds_attach_t) -1); @@ -1800,6 +1834,12 @@ extern "C" rmw_ret_t rmw_wait( #undef DETACH } +#if REPORT_BLOCKED_REQUESTS + for (auto const & c : ws->cls) { + check_for_blocked_requests(*c); + } +#endif + { std::lock_guard lock(ws->lock); ws->inuse = false; @@ -1816,7 +1856,7 @@ extern "C" rmw_ret_t rmw_wait( static rmw_ret_t rmw_take_response_request( CddsCS * cs, rmw_request_id_t * request_header, - void * ros_data, bool * taken, + void * ros_data, bool * taken, dds_time_t * source_timestamp, dds_instance_handle_t srcfilter) { RET_NULL(taken); @@ -1832,6 +1872,9 @@ static rmw_ret_t rmw_take_response_request( assert(sizeof(wrap.header.guid) <= sizeof(request_header->writer_guid)); memcpy(request_header->writer_guid, &wrap.header.guid, sizeof(wrap.header.guid)); request_header->sequence_number = wrap.header.seq; + if (source_timestamp) { + *source_timestamp = info.source_timestamp; + } if (srcfilter == 0 || srcfilter == wrap.header.guid) { *taken = true; return RMW_RET_OK; @@ -1849,10 +1892,46 @@ extern "C" rmw_ret_t rmw_take_response( { RET_WRONG_IMPLID(client); auto info = static_cast(client->data); - return rmw_take_response_request(&info->client, request_header, ros_response, taken, - info->client.pub->pubiid); + dds_time_t source_timestamp; + rmw_ret_t ret = rmw_take_response_request(&info->client, request_header, ros_response, taken, + &source_timestamp, info->client.pub->pubiid); + +#if REPORT_BLOCKED_REQUESTS + if (ret == RMW_RET_OK && *taken) { + std::lock_guard lock(info->lock); + uint64_t seq = request_header->sequence_number; + dds_time_t tnow = dds_time(); + dds_time_t dtresp = tnow - source_timestamp; + dds_time_t dtreq = tnow - info->reqtime[seq]; + if (dtreq > DDS_MSECS(REPORT_LATE_MESSAGES) || dtresp > DDS_MSECS(REPORT_LATE_MESSAGES)) { + fprintf(stderr, "** %s response time %.fms; response in history for %.fms\n", + info->client.sub->sertopic->name, static_cast(dtreq) / 1e6, + static_cast(dtresp) / 1e6); + } + info->reqtime.erase(seq); + } +#endif + return ret; } +#if REPORT_BLOCKED_REQUESTS +static void check_for_blocked_requests(CddsClient & client) +{ + dds_time_t tnow = dds_time(); + std::lock_guard lock(client.lock); + if (tnow > client.lastcheck + DDS_SECS(1)) { + client.lastcheck = tnow; + for (auto const & r : client.reqtime) { + dds_time_t dt = tnow - r.second; + if (dt > DDS_SECS(1)) { + fprintf(stderr, "** %s already waiting for %.fms\n", client.client.sub->sertopic->name, + static_cast(dt) / 1e6); + } + } + } +} +#endif + extern "C" rmw_ret_t rmw_take_request( const rmw_service_t * service, rmw_request_id_t * request_header, void * ros_request, @@ -1860,7 +1939,7 @@ extern "C" rmw_ret_t rmw_take_request( { RET_WRONG_IMPLID(service); auto info = static_cast(service->data); - return rmw_take_response_request(&info->service, request_header, ros_request, taken, 0); + return rmw_take_response_request(&info->service, request_header, ros_request, taken, nullptr, 0); } static rmw_ret_t rmw_send_response_request( @@ -1902,6 +1981,14 @@ extern "C" rmw_ret_t rmw_send_request( cdds_request_header_t header; header.guid = info->client.pub->pubiid; header.seq = *sequence_id = ++next_request_id; + +#if REPORT_BLOCKED_REQUESTS + { + std::lock_guard lock(info->lock); + info->reqtime[header.seq] = dds_time(); + } +#endif + return rmw_send_response_request(&info->client, header, ros_request); } @@ -2052,6 +2139,9 @@ extern "C" rmw_client_t * rmw_create_client( const rmw_qos_profile_t * qos_policies) { CddsClient * info = new CddsClient(); +#if REPORT_BLOCKED_REQUESTS + info->lastcheck = 0; +#endif if (rmw_init_cs(&info->client, node, type_supports, service_name, qos_policies, false) != RMW_RET_OK) {