Backport message flow tracing to foxy
This commit is contained in:
		
							parent
							
								
									c12abc5698
								
							
						
					
					
						commit
						90d9bc7296
					
				
					 3 changed files with 77 additions and 3 deletions
				
			
		|  | @ -27,6 +27,7 @@ endif() | |||
| 
 | ||||
| find_package(ament_cmake_ros REQUIRED) | ||||
| 
 | ||||
| find_package(tracetools REQUIRED) | ||||
| find_package(rcutils REQUIRED) | ||||
| find_package(rcpputils REQUIRED) | ||||
| 
 | ||||
|  | @ -44,6 +45,7 @@ find_package(rosidl_runtime_c REQUIRED) | |||
| find_package(rosidl_typesupport_introspection_c REQUIRED) | ||||
| find_package(rosidl_typesupport_introspection_cpp REQUIRED) | ||||
| 
 | ||||
| ament_export_dependencies(tracetools) | ||||
| ament_export_dependencies(rcutils) | ||||
| ament_export_dependencies(rcpputils) | ||||
| ament_export_dependencies(rmw) | ||||
|  | @ -84,6 +86,7 @@ endif() | |||
| 
 | ||||
| 
 | ||||
| ament_target_dependencies(rmw_cyclonedds_cpp | ||||
|   "tracetools" | ||||
|   "rcutils" | ||||
|   "rcpputils" | ||||
|   "rosidl_typesupport_introspection_c" | ||||
|  |  | |||
|  | @ -9,6 +9,7 @@ | |||
| 
 | ||||
|   <buildtool_depend>ament_cmake_ros</buildtool_depend> | ||||
| 
 | ||||
|   <depend>tracetools</depend> | ||||
|   <depend>cyclonedds</depend> | ||||
|   <depend>rcutils</depend> | ||||
|   <depend>rcpputils</depend> | ||||
|  |  | |||
|  | @ -74,6 +74,8 @@ | |||
| 
 | ||||
| #include "rosidl_typesupport_cpp/message_type_support.hpp" | ||||
| 
 | ||||
| #include "tracetools/tracetools.h" | ||||
| 
 | ||||
| #include "namespace_prefix.hpp" | ||||
| 
 | ||||
| #include "dds/dds.h" | ||||
|  | @ -1544,6 +1546,8 @@ extern "C" rmw_ret_t rmw_publish( | |||
|     return RMW_RET_INVALID_ARGUMENT); | ||||
|   auto pub = static_cast<CddsPublisher *>(publisher->data); | ||||
|   assert(pub); | ||||
|   const dds_time_t tstamp = dds_time(); | ||||
|   TRACEPOINT(rmw_publish, (const void *)publisher, ros_message, tstamp); | ||||
|   if (dds_write(pub->enth, ros_message) >= 0) { | ||||
|     return RMW_RET_OK; | ||||
|   } else { | ||||
|  | @ -1567,6 +1571,9 @@ extern "C" rmw_ret_t rmw_publish_serialized_message( | |||
|     serialized_message, "serialized message handle is null", | ||||
|     return RMW_RET_INVALID_ARGUMENT); | ||||
|   auto pub = static_cast<CddsPublisher *>(publisher->data); | ||||
|   const dds_time_t tstamp = dds_time(); | ||||
|   TRACEPOINT(rmw_publish, (const void *)publisher, serialized_message, tstamp); | ||||
| 
 | ||||
|   struct ddsi_serdata * d = serdata_rmw_from_serialized_message( | ||||
|     pub->sertopic, serialized_message->buffer, serialized_message->buffer_length); | ||||
|   const bool ok = (dds_writecdr(pub->enth, d) >= 0); | ||||
|  | @ -2062,6 +2069,7 @@ extern "C" rmw_publisher_t * rmw_create_publisher( | |||
|   } | ||||
| 
 | ||||
|   cleanup_publisher.cancel(); | ||||
|   TRACEPOINT(rmw_publisher_init, static_cast<const void *>(pub), cddspub->gid.data); | ||||
|   return pub; | ||||
| } | ||||
| 
 | ||||
|  | @ -2442,6 +2450,7 @@ extern "C" rmw_subscription_t * rmw_create_subscription( | |||
|   } | ||||
| 
 | ||||
|   cleanup_subscription.cancel(); | ||||
|   TRACEPOINT(rmw_subscription_init, static_cast<const void *>(sub), cddssub->gid.data); | ||||
|   return sub; | ||||
| } | ||||
| 
 | ||||
|  | @ -2597,10 +2606,17 @@ static rmw_ret_t rmw_take_int( | |||
|         fprintf(stderr, "** sample in history for %.fms\n", static_cast<double>(dt) / 1e6); | ||||
|       } | ||||
| #endif | ||||
|       return RMW_RET_OK; | ||||
|       goto take_done; | ||||
|     } | ||||
|   } | ||||
|   *taken = false; | ||||
| take_done: | ||||
|   TRACEPOINT( | ||||
|     rmw_take, | ||||
|     static_cast<const void *>(subscription), | ||||
|     static_cast<const void *>(ros_message), | ||||
|     (message_info ? message_info->source_timestamp : 0LL), | ||||
|     *taken); | ||||
|   return RMW_RET_OK; | ||||
| } | ||||
| 
 | ||||
|  | @ -2744,11 +2760,23 @@ static rmw_ret_t rmw_take_ser_int( | |||
|       serialized_message->buffer_length = d->size(); | ||||
|       ddsi_serdata_unref(dcmn); | ||||
|       *taken = true; | ||||
|       TRACEPOINT( | ||||
|         rmw_take, | ||||
|         static_cast<const void *>(subscription), | ||||
|         static_cast<const void *>(serialized_message), | ||||
|         (message_info ? message_info->source_timestamp : 0LL), | ||||
|         *taken); | ||||
|       return RMW_RET_OK; | ||||
|     } | ||||
|     ddsi_serdata_unref(dcmn); | ||||
|   } | ||||
|   *taken = false; | ||||
|   TRACEPOINT( | ||||
|     rmw_take, | ||||
|     static_cast<const void *>(subscription), | ||||
|     static_cast<const void *>(serialized_message), | ||||
|     0LL, | ||||
|     *taken); | ||||
|   return RMW_RET_OK; | ||||
| } | ||||
| 
 | ||||
|  | @ -3558,6 +3586,14 @@ extern "C" rmw_ret_t rmw_take_response( | |||
|     info->reqtime.erase(seq); | ||||
|   } | ||||
| #endif | ||||
|   TRACEPOINT( | ||||
|     rmw_take_response, | ||||
|     static_cast<const void *>(client), | ||||
|     static_cast<const void *>(ros_response), | ||||
|     (nullptr != request_header ? request_header->request_id.sequence_number : 0LL), | ||||
|     (nullptr != request_header ? request_header->source_timestamp : 0LL), | ||||
|     // rmw_take_response_request() will not take if taken==nullptr
 | ||||
|     (nullptr != taken ? *taken : false)); | ||||
|   return ret; | ||||
| } | ||||
| 
 | ||||
|  | @ -3589,9 +3625,21 @@ extern "C" rmw_ret_t rmw_take_request( | |||
|     service->implementation_identifier, eclipse_cyclonedds_identifier, | ||||
|     return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); | ||||
|   auto info = static_cast<CddsService *>(service->data); | ||||
|   return rmw_take_response_request( | ||||
| 
 | ||||
|   rmw_ret_t ret = rmw_take_response_request( | ||||
|     &info->service, request_header, ros_request, taken, nullptr, | ||||
|     false); | ||||
|   // Do not use the whole request_header->writer_guid, see the rmw_client_init tracepoint trigger
 | ||||
|   rmw_gid_t gid{}; | ||||
|   memcpy(gid.data, &request_header->request_id.writer_guid, sizeof(info->service.pub->pubiid)); | ||||
|   TRACEPOINT( | ||||
|     rmw_take_request, | ||||
|     static_cast<const void *>(service), | ||||
|     static_cast<const void *>(ros_request), | ||||
|     gid.data, | ||||
|     (nullptr != request_header ? request_header->request_id.sequence_number : 0LL), | ||||
|     *taken); | ||||
|   return ret; | ||||
| } | ||||
| 
 | ||||
| static rmw_ret_t rmw_send_response_request( | ||||
|  | @ -3696,12 +3744,23 @@ extern "C" rmw_ret_t rmw_send_response( | |||
|     dds_sleepfor(DDS_MSECS(10)); | ||||
|     tnow = std::chrono::system_clock::now(); | ||||
|   } | ||||
|   rmw_gid_t gid{}; | ||||
|   const dds_time_t timestamp = dds_time(); | ||||
|   switch (st) { | ||||
|     case client_present_t::FAILURE: | ||||
|       break; | ||||
|     case client_present_t::MAYBE: | ||||
|       return RMW_RET_TIMEOUT; | ||||
|     case client_present_t::YES: | ||||
|       // Do not use request_header->writer_guid, see the rmw_client_init tracepoint trigger
 | ||||
|       memcpy(gid.data, &header.guid, sizeof(header.guid)); | ||||
|       TRACEPOINT( | ||||
|         rmw_send_response, | ||||
|         static_cast<const void *>(service), | ||||
|         static_cast<const void *>(ros_response), | ||||
|         gid.data, | ||||
|         header.seq, | ||||
|         timestamp); | ||||
|       return rmw_send_response_request(&info->service, header, ros_response); | ||||
|     case client_present_t::GONE: | ||||
|       return RMW_RET_OK; | ||||
|  | @ -3734,6 +3793,11 @@ extern "C" rmw_ret_t rmw_send_request( | |||
|   } | ||||
| #endif | ||||
| 
 | ||||
|   TRACEPOINT( | ||||
|     rmw_send_request, | ||||
|     static_cast<const void *>(client), | ||||
|     static_cast<const void *>(ros_request), | ||||
|     header.seq); | ||||
|   return rmw_send_response_request(&info->client, header, ros_request); | ||||
| } | ||||
| 
 | ||||
|  | @ -3996,6 +4060,7 @@ extern "C" rmw_client_t * rmw_create_client( | |||
|   const char * service_name, | ||||
|   const rmw_qos_profile_t * qos_policies) | ||||
| { | ||||
|   rmw_gid_t gid{}; | ||||
|   CddsClient * info = new CddsClient(); | ||||
| #if REPORT_BLOCKED_REQUESTS | ||||
|   info->lastcheck = 0; | ||||
|  | @ -4035,7 +4100,12 @@ extern "C" rmw_client_t * rmw_create_client( | |||
|       return nullptr; | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   // rmw_cyclonedds uses info->client.pub->pubiid as the internal request header GUID, which is
 | ||||
|   // the first half (8 bytes out of 16 bytes) of the rmw_request_id_t's writer_guid. The second
 | ||||
|   // half doesn't match when read from the client side and the service side, so only use the first
 | ||||
|   // half. The second half will be zeros on both client side and service side.
 | ||||
|   memcpy(gid.data, &info->client.pub->pubiid, sizeof(info->client.pub->pubiid)); | ||||
|   TRACEPOINT(rmw_client_init, static_cast<const void *>(rmw_client), gid.data); | ||||
|   return rmw_client; | ||||
| fail_service_name: | ||||
|   rmw_client_free(rmw_client); | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue