From 79e07924d66b205838d28da4c9df6a941df6795c Mon Sep 17 00:00:00 2001 From: Niklas Halle Date: Sat, 17 May 2025 14:18:04 +0200 Subject: [PATCH] WIP: backport message flow tracing to foxy --- rmw_cyclonedds_cpp/CMakeLists.txt | 3 ++ rmw_cyclonedds_cpp/package.xml | 1 + rmw_cyclonedds_cpp/src/rmw_node.cpp | 76 +++++++++++++++++++++++++++-- 3 files changed, 77 insertions(+), 3 deletions(-) diff --git a/rmw_cyclonedds_cpp/CMakeLists.txt b/rmw_cyclonedds_cpp/CMakeLists.txt index 4111a73..8d399f5 100644 --- a/rmw_cyclonedds_cpp/CMakeLists.txt +++ b/rmw_cyclonedds_cpp/CMakeLists.txt @@ -27,6 +27,7 @@ endif() find_package(ament_cmake_ros REQUIRED) +find_package(tracetools REQUIRED) find_package(rcutils REQUIRED) find_package(rcpputils REQUIRED) @@ -44,6 +45,7 @@ find_package(rosidl_runtime_c REQUIRED) find_package(rosidl_typesupport_introspection_c REQUIRED) find_package(rosidl_typesupport_introspection_cpp REQUIRED) +ament_export_dependencies(tracetools) ament_export_dependencies(rcutils) ament_export_dependencies(rcpputils) ament_export_dependencies(rmw) @@ -84,6 +86,7 @@ endif() ament_target_dependencies(rmw_cyclonedds_cpp + "tracetools" "rcutils" "rcpputils" "rosidl_typesupport_introspection_c" diff --git a/rmw_cyclonedds_cpp/package.xml b/rmw_cyclonedds_cpp/package.xml index 98ac06e..266ca6d 100644 --- a/rmw_cyclonedds_cpp/package.xml +++ b/rmw_cyclonedds_cpp/package.xml @@ -9,6 +9,7 @@ ament_cmake_ros + tracetools cyclonedds rcutils rcpputils diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 393e522..71ced30 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -74,6 +74,8 @@ #include "rosidl_typesupport_cpp/message_type_support.hpp" +#include "tracetools/tracetools.h" + #include "namespace_prefix.hpp" #include "dds/dds.h" @@ -1544,6 +1546,8 @@ extern "C" rmw_ret_t rmw_publish( return RMW_RET_INVALID_ARGUMENT); auto pub = static_cast(publisher->data); assert(pub); + const dds_time_t tstamp = dds_time(); + TRACEPOINT(rmw_publish, (const void *)publisher, ros_message, tstamp); if (dds_write(pub->enth, ros_message) >= 0) { return RMW_RET_OK; } else { @@ -1567,6 +1571,9 @@ extern "C" rmw_ret_t rmw_publish_serialized_message( serialized_message, "serialized message handle is null", return RMW_RET_INVALID_ARGUMENT); auto pub = static_cast(publisher->data); + const dds_time_t tstamp = dds_time(); + TRACEPOINT(rmw_publish, (const void *)publisher, serialized_message, tstamp); + struct ddsi_serdata * d = serdata_rmw_from_serialized_message( pub->sertopic, serialized_message->buffer, serialized_message->buffer_length); const bool ok = (dds_writecdr(pub->enth, d) >= 0); @@ -2062,6 +2069,7 @@ extern "C" rmw_publisher_t * rmw_create_publisher( } cleanup_publisher.cancel(); + TRACEPOINT(rmw_publisher_init, static_cast(pub), cddspub->gid.data); return pub; } @@ -2442,6 +2450,7 @@ extern "C" rmw_subscription_t * rmw_create_subscription( } cleanup_subscription.cancel(); + TRACEPOINT(rmw_subscription_init, static_cast(sub), cddssub->gid.data); return sub; } @@ -2597,10 +2606,17 @@ static rmw_ret_t rmw_take_int( fprintf(stderr, "** sample in history for %.fms\n", static_cast(dt) / 1e6); } #endif - return RMW_RET_OK; + goto take_done; } } *taken = false; +take_done: + TRACEPOINT( + rmw_take, + static_cast(subscription), + static_cast(ros_message), + (message_info ? message_info->source_timestamp : 0LL), + *taken); return RMW_RET_OK; } @@ -2744,11 +2760,23 @@ static rmw_ret_t rmw_take_ser_int( serialized_message->buffer_length = d->size(); ddsi_serdata_unref(dcmn); *taken = true; + TRACEPOINT( + rmw_take, + static_cast(subscription), + static_cast(serialized_message), + (message_info ? message_info->source_timestamp : 0LL), + *taken); return RMW_RET_OK; } ddsi_serdata_unref(dcmn); } *taken = false; + TRACEPOINT( + rmw_take, + static_cast(subscription), + static_cast(serialized_message), + 0LL, + *taken); return RMW_RET_OK; } @@ -3558,6 +3586,14 @@ extern "C" rmw_ret_t rmw_take_response( info->reqtime.erase(seq); } #endif + TRACEPOINT( + rmw_take_response, + static_cast(client), + static_cast(ros_response), + (nullptr != request_header ? request_header->request_id.sequence_number : 0LL), + (nullptr != request_header ? request_header->source_timestamp : 0LL), + // rmw_take_response_request() will not take if taken==nullptr + (nullptr != taken ? *taken : false)); return ret; } @@ -3589,9 +3625,21 @@ extern "C" rmw_ret_t rmw_take_request( service->implementation_identifier, eclipse_cyclonedds_identifier, return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); auto info = static_cast(service->data); - return rmw_take_response_request( + + rmw_ret_t ret = rmw_take_response_request( &info->service, request_header, ros_request, taken, nullptr, false); + // Do not use the whole request_header->writer_guid, see the rmw_client_init tracepoint trigger + rmw_gid_t gid{}; + memcpy(gid.data, &request_header->request_id.writer_guid, sizeof(info->service.pub->pubiid)); + TRACEPOINT( + rmw_take_request, + static_cast(service), + static_cast(ros_request), + gid.data, + (nullptr != request_header ? request_header->request_id.sequence_number : 0LL), + *taken); + return ret; } static rmw_ret_t rmw_send_response_request( @@ -3696,12 +3744,23 @@ extern "C" rmw_ret_t rmw_send_response( dds_sleepfor(DDS_MSECS(10)); tnow = std::chrono::system_clock::now(); } + rmw_gid_t gid{}; + const dds_time_t timestamp = dds_time(); switch (st) { case client_present_t::FAILURE: break; case client_present_t::MAYBE: return RMW_RET_TIMEOUT; case client_present_t::YES: + // Do not use request_header->writer_guid, see the rmw_client_init tracepoint trigger + memcpy(gid.data, &header.guid, sizeof(header.guid)); + TRACEPOINT( + rmw_send_response, + static_cast(service), + static_cast(ros_response), + gid.data, + header.seq, + timestamp); return rmw_send_response_request(&info->service, header, ros_response); case client_present_t::GONE: return RMW_RET_OK; @@ -3734,6 +3793,11 @@ extern "C" rmw_ret_t rmw_send_request( } #endif + TRACEPOINT( + rmw_send_request, + static_cast(client), + static_cast(ros_request), + header.seq); return rmw_send_response_request(&info->client, header, ros_request); } @@ -3996,6 +4060,7 @@ extern "C" rmw_client_t * rmw_create_client( const char * service_name, const rmw_qos_profile_t * qos_policies) { + rmw_gid_t gid{}; CddsClient * info = new CddsClient(); #if REPORT_BLOCKED_REQUESTS info->lastcheck = 0; @@ -4035,7 +4100,12 @@ extern "C" rmw_client_t * rmw_create_client( return nullptr; } } - + // rmw_cyclonedds uses info->client.pub->pubiid as the internal request header GUID, which is + // the first half (8 bytes out of 16 bytes) of the rmw_request_id_t's writer_guid. The second + // half doesn't match when read from the client side and the service side, so only use the first + // half. The second half will be zeros on both client side and service side. + memcpy(gid.data, &info->client.pub->pubiid, sizeof(info->client.pub->pubiid)); + TRACEPOINT(rmw_client_init, static_cast(rmw_client), gid.data); return rmw_client; fail_service_name: rmw_client_free(rmw_client);