WIP: backport message flow tracing to foxy

This commit is contained in:
Niklas Halle 2025-05-17 14:18:04 +02:00
parent c12abc5698
commit 79e07924d6
3 changed files with 77 additions and 3 deletions

View file

@ -27,6 +27,7 @@ endif()
find_package(ament_cmake_ros REQUIRED)
find_package(tracetools REQUIRED)
find_package(rcutils REQUIRED)
find_package(rcpputils REQUIRED)
@ -44,6 +45,7 @@ find_package(rosidl_runtime_c REQUIRED)
find_package(rosidl_typesupport_introspection_c REQUIRED)
find_package(rosidl_typesupport_introspection_cpp REQUIRED)
ament_export_dependencies(tracetools)
ament_export_dependencies(rcutils)
ament_export_dependencies(rcpputils)
ament_export_dependencies(rmw)
@ -84,6 +86,7 @@ endif()
ament_target_dependencies(rmw_cyclonedds_cpp
"tracetools"
"rcutils"
"rcpputils"
"rosidl_typesupport_introspection_c"

View file

@ -9,6 +9,7 @@
<buildtool_depend>ament_cmake_ros</buildtool_depend>
<depend>tracetools</depend>
<depend>cyclonedds</depend>
<depend>rcutils</depend>
<depend>rcpputils</depend>

View file

@ -74,6 +74,8 @@
#include "rosidl_typesupport_cpp/message_type_support.hpp"
#include "tracetools/tracetools.h"
#include "namespace_prefix.hpp"
#include "dds/dds.h"
@ -1544,6 +1546,8 @@ extern "C" rmw_ret_t rmw_publish(
return RMW_RET_INVALID_ARGUMENT);
auto pub = static_cast<CddsPublisher *>(publisher->data);
assert(pub);
const dds_time_t tstamp = dds_time();
TRACEPOINT(rmw_publish, (const void *)publisher, ros_message, tstamp);
if (dds_write(pub->enth, ros_message) >= 0) {
return RMW_RET_OK;
} else {
@ -1567,6 +1571,9 @@ extern "C" rmw_ret_t rmw_publish_serialized_message(
serialized_message, "serialized message handle is null",
return RMW_RET_INVALID_ARGUMENT);
auto pub = static_cast<CddsPublisher *>(publisher->data);
const dds_time_t tstamp = dds_time();
TRACEPOINT(rmw_publish, (const void *)publisher, serialized_message, tstamp);
struct ddsi_serdata * d = serdata_rmw_from_serialized_message(
pub->sertopic, serialized_message->buffer, serialized_message->buffer_length);
const bool ok = (dds_writecdr(pub->enth, d) >= 0);
@ -2062,6 +2069,7 @@ extern "C" rmw_publisher_t * rmw_create_publisher(
}
cleanup_publisher.cancel();
TRACEPOINT(rmw_publisher_init, static_cast<const void *>(pub), cddspub->gid.data);
return pub;
}
@ -2442,6 +2450,7 @@ extern "C" rmw_subscription_t * rmw_create_subscription(
}
cleanup_subscription.cancel();
TRACEPOINT(rmw_subscription_init, static_cast<const void *>(sub), cddssub->gid.data);
return sub;
}
@ -2597,10 +2606,17 @@ static rmw_ret_t rmw_take_int(
fprintf(stderr, "** sample in history for %.fms\n", static_cast<double>(dt) / 1e6);
}
#endif
return RMW_RET_OK;
goto take_done;
}
}
*taken = false;
take_done:
TRACEPOINT(
rmw_take,
static_cast<const void *>(subscription),
static_cast<const void *>(ros_message),
(message_info ? message_info->source_timestamp : 0LL),
*taken);
return RMW_RET_OK;
}
@ -2744,11 +2760,23 @@ static rmw_ret_t rmw_take_ser_int(
serialized_message->buffer_length = d->size();
ddsi_serdata_unref(dcmn);
*taken = true;
TRACEPOINT(
rmw_take,
static_cast<const void *>(subscription),
static_cast<const void *>(serialized_message),
(message_info ? message_info->source_timestamp : 0LL),
*taken);
return RMW_RET_OK;
}
ddsi_serdata_unref(dcmn);
}
*taken = false;
TRACEPOINT(
rmw_take,
static_cast<const void *>(subscription),
static_cast<const void *>(serialized_message),
0LL,
*taken);
return RMW_RET_OK;
}
@ -3558,6 +3586,14 @@ extern "C" rmw_ret_t rmw_take_response(
info->reqtime.erase(seq);
}
#endif
TRACEPOINT(
rmw_take_response,
static_cast<const void *>(client),
static_cast<const void *>(ros_response),
(nullptr != request_header ? request_header->request_id.sequence_number : 0LL),
(nullptr != request_header ? request_header->source_timestamp : 0LL),
// rmw_take_response_request() will not take if taken==nullptr
(nullptr != taken ? *taken : false));
return ret;
}
@ -3589,9 +3625,21 @@ extern "C" rmw_ret_t rmw_take_request(
service->implementation_identifier, eclipse_cyclonedds_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
auto info = static_cast<CddsService *>(service->data);
return rmw_take_response_request(
rmw_ret_t ret = rmw_take_response_request(
&info->service, request_header, ros_request, taken, nullptr,
false);
// Do not use the whole request_header->writer_guid, see the rmw_client_init tracepoint trigger
rmw_gid_t gid{};
memcpy(gid.data, &request_header->request_id.writer_guid, sizeof(info->service.pub->pubiid));
TRACEPOINT(
rmw_take_request,
static_cast<const void *>(service),
static_cast<const void *>(ros_request),
gid.data,
(nullptr != request_header ? request_header->request_id.sequence_number : 0LL),
*taken);
return ret;
}
static rmw_ret_t rmw_send_response_request(
@ -3696,12 +3744,23 @@ extern "C" rmw_ret_t rmw_send_response(
dds_sleepfor(DDS_MSECS(10));
tnow = std::chrono::system_clock::now();
}
rmw_gid_t gid{};
const dds_time_t timestamp = dds_time();
switch (st) {
case client_present_t::FAILURE:
break;
case client_present_t::MAYBE:
return RMW_RET_TIMEOUT;
case client_present_t::YES:
// Do not use request_header->writer_guid, see the rmw_client_init tracepoint trigger
memcpy(gid.data, &header.guid, sizeof(header.guid));
TRACEPOINT(
rmw_send_response,
static_cast<const void *>(service),
static_cast<const void *>(ros_response),
gid.data,
header.seq,
timestamp);
return rmw_send_response_request(&info->service, header, ros_response);
case client_present_t::GONE:
return RMW_RET_OK;
@ -3734,6 +3793,11 @@ extern "C" rmw_ret_t rmw_send_request(
}
#endif
TRACEPOINT(
rmw_send_request,
static_cast<const void *>(client),
static_cast<const void *>(ros_request),
header.seq);
return rmw_send_response_request(&info->client, header, ros_request);
}
@ -3996,6 +4060,7 @@ extern "C" rmw_client_t * rmw_create_client(
const char * service_name,
const rmw_qos_profile_t * qos_policies)
{
rmw_gid_t gid{};
CddsClient * info = new CddsClient();
#if REPORT_BLOCKED_REQUESTS
info->lastcheck = 0;
@ -4035,7 +4100,12 @@ extern "C" rmw_client_t * rmw_create_client(
return nullptr;
}
}
// rmw_cyclonedds uses info->client.pub->pubiid as the internal request header GUID, which is
// the first half (8 bytes out of 16 bytes) of the rmw_request_id_t's writer_guid. The second
// half doesn't match when read from the client side and the service side, so only use the first
// half. The second half will be zeros on both client side and service side.
memcpy(gid.data, &info->client.pub->pubiid, sizeof(info->client.pub->pubiid));
TRACEPOINT(rmw_client_init, static_cast<const void *>(rmw_client), gid.data);
return rmw_client;
fail_service_name:
rmw_client_free(rmw_client);