dataflow-analysis/trace-analysis.ipynb

841 lines
36 KiB
Text
Raw Normal View History

{
"cells": [
{
"cell_type": "code",
2025-06-10 14:36:56 +00:00
"execution_count": null,
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import os\n",
"import sys\n",
2022-09-15 18:04:45 +02:00
"import re\n",
"from tqdm import tqdm\n",
"from typing import Iterable\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
"from matplotlib import pyplot as plt\n",
"\n",
2022-09-15 16:17:24 +02:00
"from misc.utils import cached, parse_as\n",
"\n",
"# TODO: This has no effect for some reason (goal is to make graphs in notebook large and hi-res)\n",
"plt.rcParams['figure.dpi'] = 300\n",
"\n",
"%matplotlib inline"
2023-02-15 17:02:20 +09:00
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false
2023-02-15 17:02:20 +09:00
},
"source": [
"# User Settings"
]
},
{
"cell_type": "code",
2025-06-10 14:36:56 +00:00
"execution_count": null,
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
2025-06-10 14:36:56 +00:00
"outputs": [],
"source": [
"##################################################\n",
"# User Settings\n",
"##################################################\n",
"# Change these to influence the execution of the\n",
"# notebook.\n",
"# You can override these from the command line\n",
"# by defining environment variables with the\n",
"# name of the constants below, prefixed with\n",
"# \"ANA_NB_\".\n",
"# For example, the environment variable\n",
"# \"ANA_NB_TR_PATH\" will override the \"TR_PATH\"\n",
"# setting below.\n",
"##################################################\n",
"\n",
"# The path to the build folder of a ROS2 workspace that contains the\n",
"# tracetools_read and tracetools_analysis folders.\n",
2025-06-03 13:43:48 +00:00
"TRACING_WS_BUILD_PATH = \"/home/niklas/dataflow-analysis/dependencies/build\"\n",
"\n",
2025-06-06 08:24:47 +00:00
"# The name of the experiment that is being analyzed.\n",
"# This will be used to name output directory.\n",
2025-06-10 14:36:56 +00:00
"EXPERIMENT_NAME = \"edf_multi_timed_20-20250610091402\"\n",
2025-06-06 08:24:47 +00:00
"\n",
"# Path to trace directory (e.g. ~/.ros/my-trace/ust) or to a converted trace file.\n",
"# Using the path \"/ust\" at the end is optional but greatly reduces processing time\n",
"# if kernel traces are also present.\n",
2025-06-06 08:24:47 +00:00
"TR_PATH = f\"/home/niklas/dataflow-analysis/traces/{EXPERIMENT_NAME}/ust\"\n",
"\n",
"# Path to the folder all artifacts from this notebook are saved to.\n",
"# This entails plots as well as data tables.\n",
2025-06-06 08:24:47 +00:00
"OUT_PATH = f\"out/{EXPERIMENT_NAME}/\"\n",
"\n",
2022-09-15 16:17:24 +02:00
"# Whether to cache the results of long computations per set of inputs\n",
2022-09-15 18:04:45 +02:00
"CACHING_ENABLED = False\n",
2022-09-15 16:17:24 +02:00
"\n",
"# Whether to annotate topics/publications with bandwidth/message size\n",
"BW_ENABLED = False\n",
"# Path to a HDF5 file as output by ma-hw-perf-tools/messages/record.bash\n",
"# Used to annotate message sizes in E2E latency calculations\n",
2023-02-15 17:02:20 +09:00
"BW_PATH = \"path/to/messages.h5\"\n",
"\n",
"# Whether to use dependencies extracted by the Clang-tools to supplement\n",
"# automatic node-internal data flow annotations.\n",
"# If in doubt, set to False.\n",
"CL_ENABLED = False\n",
"# Path to the output directory of the ROS2 dependency checker.\n",
"# Will only be used if CL_ENABLED is True.\n",
2023-02-15 17:02:20 +09:00
"CL_PATH = \"/path/to/code_analysis/output\"\n",
"\n",
"# Whether to compute data flow graphs.\n",
"# If you are only interested in E2E latencies, set this to False\n",
2025-06-03 13:43:48 +00:00
"DFG_ENABLED = True\n",
"# Whether to plot data flow graphs (ignored if DFG_ENABLED is False)\n",
2025-06-03 13:43:48 +00:00
"DFG_PLOT = True\n",
"\n",
"# The maximum node namespace hierarchy level to be plotted.\n",
"# Top-level (1): e.g. /sensing, /control, etc.\n",
"# Level 3: e.g. /sensing/lidar/pointcloud_processor\n",
"DFG_MAX_HIER_LEVEL = 100\n",
"\n",
"# RegEx pattern for nodes that shall be marked as system inputs\n",
"# These will be plotted with a start arrow as known from automata diagrams\n",
2025-06-03 13:43:48 +00:00
"DFG_INPUT_NODE_PATTERNS = [r\"^/input_\"]\n",
"# RegEx pattern for nodes that shall be marked as system outputs\n",
"# These will be plotted with a double border\n",
2025-06-03 13:43:48 +00:00
"DFG_OUTPUT_NODE_PATTERNS = [r\"^/output_\"]\n",
"# RegEx for nodes which shall not be plotted in the DFG\n",
"DFG_EXCL_NODE_PATTERNS = [r\"^/rviz2\"]\n",
"\n",
"# Whether to compute E2E latencies.\n",
"E2E_ENABLED = True\n",
"# Whether to plot end-to-end latency information (ignored if E2E_ENABLED is False)\n",
"E2E_PLOT = True\n",
"# The index of the output message that shall be used in plots that visualize a specific\n",
"# message dependency tree. This index has to be 0 <= n < #output messages\n",
2025-06-09 14:12:01 +00:00
"E2E_PLOT_TIMESTAMP = 5\n",
"# E2E latency threshold. Every E2E latency higher than this is discarded.\n",
"# Set this as low as comfortably possible to speed up calculations.\n",
"# WARNING: If you set this too low (i.e. to E2E latencies that plausibly can happen)\n",
"# your results will be wrong)\n",
2025-06-09 14:12:01 +00:00
"E2E_TIME_LIMIT_S = 1000\n",
"\n",
"# All topics containing any of these RegEx patterns are considered output topics in E2E latency calculations\n",
"# E.g. r\"^/control/\" will cover all control topics\n",
2025-06-03 13:43:48 +00:00
"E2E_OUTPUT_TOPIC_PATTERNS = [r\"^/output/\"]\n",
"# All topics containing any of these RegEx patterns are considered input topics in E2E latency calculations\n",
"# E.g. r\"^/sensing/\" will cover all sensing topics\n",
2025-06-03 13:43:48 +00:00
"E2E_INPUT_TOPIC_PATTERNS = [r\"^/input/\"]\n",
"\n",
2022-09-20 15:42:24 +02:00
"# E2E paths are uniquely identified by a string like \"/topic/1 -> void(Node1)(args1) -> /topic/2 -> void(Node2)(args2) -> void(Node2)(args3) -> ...\".\n",
"# Certain patterns only occur in initial setup or in scenario switching and can be excluded via RegEx patterns here.\n",
2025-06-03 13:43:48 +00:00
"E2E_EXCL_PATH_PATTERNS = [r\"^/parameter_events\"]\n",
2022-09-20 15:42:24 +02:00
"\n",
"# To specify paths of interest, topic/callback name patterns that HAVE TO OCCUR in each E2E path can be specified as RegEx here.\n",
"#E2E_INCL_PATH_PATTERNS = [\"BehaviorPathPlanner\", \"BehaviorVelocityPlanner\", \"pointcloud_preprocessor::Filter\", r\"^/sensing/.*?pointcloud\"]\n",
"E2E_INCL_PATH_PATTERNS = []\n",
"\n",
"# If an exact path through the system is known, this variabe can be set to a list (order matters!) of all elements on the path.\n",
"# The first item ist the one at the \"input end\" of the system, the last one the \"output end\".\n",
2025-06-03 13:43:48 +00:00
"E2E_EXACT_PATH = []\n",
2022-09-20 15:42:24 +02:00
"\n",
"# For development purposes only. Leave this at False.\n",
"DEBUG = False\n",
"\n",
"# For development purposes only. Leave this at False.\n",
"MANUAL_CACHE = False\n",
"\n",
"##################################################\n",
"# End of User Settings\n",
"##################################################\n",
"\n",
"# This code overrides the above constants with environment variables, with values from the environment.\n",
"# Values will be parsed through Python's literal_eval() function. Thus, strings have to be specified\n",
"# including \" or '.\n",
"for env_key, env_value in os.environ.items():\n",
" if env_key.startswith(\"ANA_NB_\"):\n",
" key = env_key.removeprefix(\"ANA_NB_\")\n",
" if key not in globals().keys():\n",
" continue\n",
" value = parse_as(type(globals()[key]), env_value)\n",
" globals()[key] = value\n",
"\n",
2022-09-15 18:04:45 +02:00
"\n",
"# Convert input paths to absolute paths\n",
"def _expand_path(path):\n",
" return os.path.realpath(os.path.expandvars(os.path.expanduser(path)))\n",
"\n",
2022-09-15 18:04:45 +02:00
"\n",
"TRACING_WS_BUILD_PATH = _expand_path(TRACING_WS_BUILD_PATH)\n",
"TR_PATH = _expand_path(TR_PATH)\n",
"OUT_PATH = _expand_path(OUT_PATH)\n",
"BW_PATH = _expand_path(BW_PATH)\n",
"CL_PATH = _expand_path(CL_PATH)\n",
"\n",
"os.makedirs(OUT_PATH, exist_ok=True)\n",
"\n",
"# Print parsed user settings\n",
"print(\"User Settings:\")\n",
"for k, v in globals().copy().items():\n",
" if not k.isupper():\n",
" continue\n",
" if isinstance(v, Iterable) and not isinstance(v, str):\n",
" v = (\"\\n \" + (\" \" * 44)).join(list(map(str, v)))\n",
" print(f\" {k:.<40s} := {v}\")"
2023-02-15 17:02:20 +09:00
]
},
{
"cell_type": "code",
2025-06-10 14:36:56 +00:00
"execution_count": null,
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# The last few imports can only be resolved using the user-specified paths above\n",
"sys.path.append(os.path.join(TRACING_WS_BUILD_PATH, \"tracetools_read/\"))\n",
"sys.path.append(os.path.join(TRACING_WS_BUILD_PATH, \"tracetools_analysis/\"))\n",
"# noinspection PyUnresolvedReferences\n",
"from tracetools_read.trace import *\n",
"# noinspection PyUnresolvedReferences\n",
"from tracetools_analysis.loading import load_file\n",
"# noinspection PyUnresolvedReferences\n",
"from tracetools_analysis.processor.ros2 import Ros2Handler\n",
"\n",
"from tracing_interop.tr_types import *"
2023-02-15 17:02:20 +09:00
]
},
{
"cell_type": "markdown",
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
"source": [
"# Load Trace Data\n",
"\n",
"Load (and, if necessary, convert) tracing data obtained through ros2_tracing. Build indices for fast analysis."
2023-02-15 17:02:20 +09:00
]
},
{
"cell_type": "code",
2025-06-10 14:36:56 +00:00
"execution_count": null,
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
2025-06-10 14:36:56 +00:00
"outputs": [],
"source": [
"def _load_traces():\n",
" file = load_file(TR_PATH)\n",
" handler = Ros2Handler.process(file)\n",
" return TrContext(handler)\n",
"\n",
2022-08-09 18:36:53 +02:00
"\n",
2022-09-15 18:04:45 +02:00
"_tracing_context = cached(\"tr_objects\", _load_traces, [TR_PATH], not CACHING_ENABLED)\n",
"_tr_globals = [\"nodes\", \"publishers\", \"subscriptions\", \"timers\", \"timer_node_links\", \"subscription_objects\",\n",
" \"callback_objects\", \"callback_symbols\", \"publish_instances\", \"callback_instances\", \"topics\"]\n",
"\n",
"# Help the IDE recognize those identifiers by assigning a dummy value to their name.\n",
"nodes: Index = Index([])\n",
"publishers: Index = Index([])\n",
"subscriptions: Index = Index([])\n",
"timers: Index = Index([])\n",
"timer_node_links: Index = Index([])\n",
"subscription_objects: Index = Index([])\n",
"callback_objects: Index = Index([])\n",
"callback_symbols: Index = Index([])\n",
"publish_instances: Index = Index([])\n",
"callback_instances: Index = Index([])\n",
"topics: Index = Index([])\n",
"\n",
"for name in _tr_globals:\n",
" globals()[name] = getattr(_tracing_context, name)\n",
2022-05-30 16:51:06 +02:00
"\n",
"print(\"Done.\")"
2023-02-15 17:02:20 +09:00
]
},
2022-09-16 13:36:42 +02:00
{
"cell_type": "code",
2025-06-10 14:36:56 +00:00
"execution_count": null,
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
2025-06-10 14:36:56 +00:00
"outputs": [],
2022-09-16 13:36:42 +02:00
"source": [
"##################################################\n",
"# Print (All/Input/Output) Topic Message Counts\n",
"##################################################\n",
"\n",
2022-09-16 16:38:23 +02:00
"for topic in sorted(topics, key=lambda t: t.name):\n",
2022-09-16 13:36:42 +02:00
" topic: TrTopic\n",
" print(f\"{topic.name:.<130s} | {sum(map(lambda p: len(p.instances), topic.publishers)):>5d} msgs\")\n",
2022-09-20 12:44:04 +02:00
"\n",
"print(\"\\n[DEBUG] INPUT TOPICS\")\n",
2022-09-20 12:44:04 +02:00
"for t in sorted(topics, key=lambda t: t.name):\n",
" for f in E2E_INPUT_TOPIC_PATTERNS:\n",
" if re.search(f, t.name):\n",
" print(f\"--[DEBUG] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers)):>5d} msgs\")\n",
2022-09-20 12:44:04 +02:00
"\n",
"print(\"\\n[DEBUG] OUTPUT TOPICS\")\n",
2022-09-20 12:44:04 +02:00
"for t in sorted(topics, key=lambda t: t.name):\n",
" for f in E2E_OUTPUT_TOPIC_PATTERNS:\n",
" if re.search(f, t.name):\n",
" print(f\"--[DEBUG] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers)):>5d} msgs\")"
2023-02-15 17:02:20 +09:00
]
2022-09-16 13:36:42 +02:00
},
{
"cell_type": "markdown",
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
"source": [
"# Analyze ROS Graph\n",
"Reconstruct namespace hierarchy, data flow graph between callbacks."
2023-02-15 17:02:20 +09:00
]
},
{
"cell_type": "code",
2025-06-10 14:36:56 +00:00
"execution_count": null,
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
2025-06-10 14:36:56 +00:00
"outputs": [],
"source": [
"import latency_graph.latency_graph_structure as lg\n",
"\n",
2022-09-15 18:04:45 +02:00
"\n",
"def _make_latency_graph():\n",
" return lg.LatencyGraph(_tracing_context)\n",
"\n",
2022-09-15 18:04:45 +02:00
"\n",
"lat_graph = cached(\"lat_graph\", _make_latency_graph, [TR_PATH], not CACHING_ENABLED)"
2023-02-15 17:02:20 +09:00
]
},
{
"cell_type": "markdown",
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
"source": [
"## Plot ROS Graph (full)\n",
"Plot the DFG hierarchically by node namespace. Plot each internal and external dependency between callbacks as one arrow."
2023-02-15 17:02:20 +09:00
]
},
{
"cell_type": "code",
2025-06-10 14:36:56 +00:00
"execution_count": null,
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
2025-06-10 14:36:56 +00:00
"outputs": [],
"source": [
"%%skip_if_false DFG_ENABLED\n",
"%%skip_if_false DFG_PLOT\n",
"\n",
"#################################################\n",
"# Plot full DFG, down to the callback level\n",
"#################################################\n",
"\n",
"from latency_graph.latency_graph_plots import plot_latency_graph_full\n",
"\n",
"plot_latency_graph_full(lat_graph, _tracing_context, os.path.join(OUT_PATH, \"latency_graph_full\"))"
2023-02-15 17:02:20 +09:00
]
},
{
"cell_type": "markdown",
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
"source": [
"## Plot Latency Graph (overview)\n",
"Plot the DFG down to a certain hierarchy level in a flattened manner. Aggregate dependencies from multiple callbacks into corresponding node-node dependencies."
2023-02-15 17:02:20 +09:00
]
},
{
"cell_type": "code",
2025-06-10 14:36:56 +00:00
"execution_count": null,
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%%%skip_if_false DFG_ENABLED\n"
}
},
2025-06-10 14:36:56 +00:00
"outputs": [],
2022-05-30 16:51:06 +02:00
"source": [
"%%skip_if_false DFG_ENABLED\n",
"%%skip_if_false DFG_PLOT\n",
"\n",
"#################################################\n",
"# Plot overview of DFG, down to a certain\n",
"# hierarchy level\n",
"#################################################\n",
"\n",
"from latency_graph.latency_graph_plots import plot_latency_graph_overview\n",
2022-06-28 10:06:42 +02:00
"\n",
2025-06-03 13:43:48 +00:00
"print(f\"Plotting DFG overview with max hierarchy level {DFG_MAX_HIER_LEVEL}...\")\n",
"print(f\"Input Node Patterns: {DFG_INPUT_NODE_PATTERNS}\")\n",
"print(f\"Output Node Patterns: {DFG_OUTPUT_NODE_PATTERNS}\")\n",
"print(f\"Excluded Node Patterns: {DFG_EXCL_NODE_PATTERNS}\")\n",
"\n",
"plot_latency_graph_overview(lat_graph, DFG_EXCL_NODE_PATTERNS, DFG_INPUT_NODE_PATTERNS, DFG_OUTPUT_NODE_PATTERNS, DFG_MAX_HIER_LEVEL, os.path.join(OUT_PATH, \"latency_graph_overview\"))"
2023-02-15 17:02:20 +09:00
]
},
{
"cell_type": "markdown",
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
"source": [
"# Analyze Message Flow\n",
"Build dependency trees ending in the specified output topics."
2023-02-15 17:02:20 +09:00
]
},
{
"cell_type": "code",
2025-06-10 14:36:56 +00:00
"execution_count": null,
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
2025-06-10 14:36:56 +00:00
"outputs": [],
"source": [
"%%skip_if_false E2E_ENABLED\n",
"\n",
"from message_tree.message_tree_algorithms import build_dep_trees\n",
"\n",
"end_topics = [t for t in _tracing_context.topics if any(re.search(f, t.name) for f in E2E_OUTPUT_TOPIC_PATTERNS)]\n",
"\n",
2025-06-03 13:43:48 +00:00
"print(f\"Found {len(end_topics)} end topics for E2E latency calculations: {', '.join(t.name for t in end_topics)}\")\n",
"\n",
"print(f\"Using {len(_tracing_context.callback_objects)} callback objects, {len(_tracing_context.callback_instances)} callback instances, and {len(_tracing_context.publish_instances)} publish instances for E2E latency calculations.\")\n",
"\n",
"def _build_dep_trees():\n",
2025-06-03 13:43:48 +00:00
" return build_dep_trees(end_topics, lat_graph, _tracing_context, E2E_EXCL_PATH_PATTERNS, E2E_TIME_LIMIT_S)\n",
" #return build_dep_trees(end_topics, lat_graph, _tracing_context, E2E_EXCL_PATH_PATTERNS, E2E_TIME_LIMIT_S, exact_path=E2E_EXACT_PATH)\n",
"\n",
"try:\n",
" trees = cached(\"trees\", _build_dep_trees, [TR_PATH], not CACHING_ENABLED)\n",
"except Exception as e:\n",
" import traceback\n",
" print(e)\n",
" traceback.print_exc()"
2023-02-15 17:02:20 +09:00
]
},
{
"cell_type": "code",
2025-06-10 14:36:56 +00:00
"execution_count": null,
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
2025-06-10 14:36:56 +00:00
"outputs": [],
"source": [
"%%skip_if_false E2E_ENABLED\n",
"%%skip_if_false BW_ENABLED\n",
"\n",
"##################################################\n",
"# Get message bandwidths from `ros2 multitopic bw`\n",
"# output.\n",
"##################################################\n",
"\n",
"from bw_interop.process_bw_output import get_topic_messages\n",
"msgs = get_topic_messages(BW_PATH)\n",
"\n",
"from bw_interop.bw_plots import dds_lat_msg_size_scatter\n",
"plot_topic = \"\""
2023-02-15 17:02:20 +09:00
]
},
{
"cell_type": "code",
2025-06-10 14:36:56 +00:00
"execution_count": null,
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
2025-06-10 14:36:56 +00:00
"outputs": [],
"source": [
"%%skip_if_false E2E_ENABLED\n",
"\n",
"from message_tree.message_tree_algorithms import e2e_paths_sorted_desc\n",
"from message_tree.message_tree_algorithms import owner\n",
2025-06-03 13:43:48 +00:00
"from message_tree.message_tree_structure import DepTree\n",
"\n",
"# set of seen publush topics to avoid duplicates\n",
"seen_publish_topics = set()\n",
"\n",
"def _print_elem(elem, indent=0):\n",
" if isinstance(elem, TrTopic):\n",
" print(f\"{' ' * indent}Topic: {elem.name}\")\n",
" elif isinstance(elem, TrCallbackObject):\n",
" print(f\"{' ' * indent}Callback: {elem.name} ({elem.elem.name})\")\n",
" elif isinstance(elem, TrPublishInstance):\n",
" #if elem.publisher.topic_name in seen_publish_topics:\n",
" # print(f\"{' ' * indent}Publish Instance: {elem.publisher.topic_name} (already seen)\")\n",
" # return False\n",
" #seen_publish_topics.add(elem.publisher.topic_name)\n",
" print(f\"{' ' * indent}Publish Instance: {elem.publisher.topic_name}\")\n",
" elif isinstance(elem, TrCallbackInstance):\n",
" print(f\"{' ' * indent}Callback Instance: {elem.callback_obj.callback_symbol.symbol}\")\n",
" else:\n",
" print(f\"{' ' * indent}Unknown element type: {type(elem)}\")\n",
" return True\n",
"\n",
"def _traverse_print_tree(tree, indent=0):\n",
" \"\"\"Recursively traverse a dependency tree and print its structure.\"\"\"\n",
" for i, node in enumerate(tree):\n",
" if isinstance(node, DepTree):\n",
" if not _print_elem(node.head, indent):\n",
" continue\n",
" _traverse_print_tree(node.deps, indent + 2)\n",
" elif isinstance(node, list):\n",
" for entry in node:\n",
" if entry is not None and isinstance(entry, DepTree):\n",
" if not _print_elem(entry.head, indent):\n",
" continue\n",
" _traverse_print_tree(entry.deps, indent + 2)\n",
" else:\n",
" print(f\"{' ' * indent}List of {len(node)} elements, but not a DepTree.\")\n",
" else:\n",
" if not _print_elem(node, indent):\n",
" continue\n",
"\n",
"def _print_trees():\n",
" print(\"Dependency Trees:\")\n",
" for i, tree in enumerate(trees):\n",
" print(f\"\\nTree {i + 1}/{len(trees)}:\")\n",
" _traverse_print_tree(tree)\n",
" print(\"\\nEnd of Dependency Trees\\n\")\n",
" print(f\"Published topics seen so far: {len(seen_publish_topics)}:, {', '.join(seen_publish_topics)}\")\n",
2025-06-06 08:24:47 +00:00
"#_print_trees()\n",
2025-06-03 13:43:48 +00:00
"\n",
"\n",
"##################################################\n",
"# Find and filter relevant E2E paths in trees\n",
"##################################################\n",
2025-06-03 13:43:48 +00:00
"trees_paths = [e2e_paths_sorted_desc(tree, E2E_INPUT_TOPIC_PATTERNS) for tree in tqdm(trees, mininterval=10.0, desc=\"Extracting E2E paths\")]\n",
"all_paths = [p for paths in trees_paths for p in paths]\n",
"\n",
2025-06-03 13:43:48 +00:00
"def _print_paths():\n",
" for i, path in enumerate(all_paths):\n",
" print(f\"Path {i+1}/{len(all_paths)}:\")\n",
" for j, elem in enumerate(path):\n",
" if isinstance(elem, TrTopic):\n",
" print(f\" {j+1:>3d}: Topic: {elem.name}\")\n",
" elif isinstance(elem, TrCallbackObject):\n",
" print(f\" {j+1:>3d}: Callback: {elem.name} ({elem.node.name})\")\n",
" elif isinstance(elem, TrPublishInstance):\n",
" print(f\" {j+1:>3d}: Publish Instance: {elem.publisher.topic_name}\")\n",
" elif isinstance(elem, TrCallbackInstance):\n",
" print(f\" {j+1:>3d}: Callback Instance: {elem.callback_obj.callback_symbol.symbol}\")\n",
" else:\n",
" print(f\" {j+1:>3d}: Unknown element type: {elem}\")\n",
" print()\n",
2025-06-06 08:24:47 +00:00
"#_print_paths()\n",
2025-06-03 13:43:48 +00:00
"\n",
"print(f\"Found {len(all_paths)} E2E paths in total.\")"
2023-02-15 17:02:20 +09:00
]
},
{
"cell_type": "code",
2025-06-10 14:36:56 +00:00
"execution_count": null,
2025-06-06 08:24:47 +00:00
"metadata": {
"collapsed": false
},
2025-06-10 14:36:56 +00:00
"outputs": [],
2025-06-06 08:24:47 +00:00
"source": [
"%%skip_if_false E2E_ENABLED\n",
"\n",
"from message_tree.message_tree_algorithms import aggregate_e2e_paths\n",
"\n",
"##################################################\n",
"# Group dataflows by DFG path\n",
"##################################################\n",
"\n",
"cohorts = aggregate_e2e_paths(all_paths)\n",
"print(f\"Found {len(cohorts)} cohorts of E2E paths.\")\n",
"cohort_pairs = [(k, v) for k, v in cohorts.items()]\n",
"cohort_pairs.sort(key=lambda kv: len(kv[1]), reverse=True)\n",
"\n",
"path_records = [{\"path\": path_key,\n",
" \"timestamp\": path[-1].timestamp,\n",
" \"e2e_latency\": path[-1].timestamp - path[0].timestamp} \\\n",
" for path_key, paths in cohort_pairs for path in paths if path]\n",
"\n",
"out_df = pd.DataFrame.from_records(path_records)\n",
"out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\", index=False)\n",
"\n",
"df_print = out_df[['path', 'e2e_latency']].groupby(\"path\").agg(['count', 'mean', 'min', 'max']).reset_index()\n",
"#df_print['path'] = df_print['path'].apply(lambda path: \" -> \".join(filter(lambda part: part.startswith(\"/\"), path.split(\" -> \"))))\n",
"df_print = df_print.sort_values((\"e2e_latency\", \"count\"), ascending=False)\n",
"df_print.to_csv(os.path.join(OUT_PATH, \"e2e_overview.csv\"), sep=\"\\t\", index=False)\n",
"df_print"
]
},
{
"cell_type": "code",
2025-06-10 14:36:56 +00:00
"execution_count": null,
2025-06-06 08:24:47 +00:00
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import pickle\n",
"# with open(\"state.pkl\", \"wb\") as f:\n",
"# pickle.dump((trees_paths, all_paths, cohorts), f)\n",
"with open(os.path.join(OUT_PATH, \"state.pkl\"), \"wb\") as f:\n",
" pickle.dump((trees_paths, all_paths, cohorts), f)"
]
},
{
"cell_type": "code",
2025-06-09 14:12:01 +00:00
"execution_count": null,
2023-02-15 17:02:20 +09:00
"metadata": {
"collapsed": false
},
2025-06-10 14:36:56 +00:00
"outputs": [],
"source": [
2025-06-06 08:24:47 +00:00
"import matplotlib.patches as mpatches\n",
2025-06-09 14:12:01 +00:00
"from collections import Counter\n",
2025-06-06 08:24:47 +00:00
"\n",
"from message_tree.message_tree_algorithms import e2e_latency_breakdown, label_latency_item\n",
"from message_tree.message_tree_plots import e2e_breakdown_stack\n",
2025-06-03 13:43:48 +00:00
"\n",
"##################################################\n",
2025-06-06 08:24:47 +00:00
"# 1. Filter DFG paths through must-be-included and cannot-be-included patterns\n",
"##################################################\n",
"\n",
2025-06-06 08:24:47 +00:00
"cohorts_filt = {\n",
" k: v for k, v in cohorts.items()\n",
" if all(re.search(f.removeprefix(\"^\").removesuffix(\"$\"), k) for f in E2E_INCL_PATH_PATTERNS)\n",
"}\n",
"\n",
2025-06-06 08:24:47 +00:00
"##################################################\n",
"# 2. Iterate through filtered DFG paths\n",
"##################################################\n",
"\n",
2025-06-10 14:36:56 +00:00
"result_strings = []\n",
"result_strings_csv = []\n",
"\n",
2025-06-06 08:24:47 +00:00
"for relevant_path, relevant_dataflows in cohorts_filt.items():\n",
" # Construct a filesystem-safe name for output files\n",
2025-06-03 13:43:48 +00:00
" topics = [topic for topic in relevant_path.split(\" -> \") if topic.startswith(\"/\") and not topic.startswith(\"/void\")]\n",
2025-06-06 08:24:47 +00:00
" name = f\"{topics[0][1:]}-{topics[-1][1:]}\"\n",
2025-06-03 13:43:48 +00:00
" name = name.replace(\"/\", \"_\").replace(\" \", \"_\").replace(\":\", \"_\")\n",
"\n",
2025-06-06 08:24:47 +00:00
" # Break down E2E latency for all dataflows on this path\n",
2025-06-03 13:43:48 +00:00
" e2e_breakdowns = list(map(e2e_latency_breakdown, relevant_dataflows))\n",
2025-06-06 08:24:47 +00:00
" flattened_breakdowns = [item for breakdown in e2e_breakdowns for item in breakdown]\n",
2025-06-03 13:43:48 +00:00
"\n",
2025-06-06 08:24:47 +00:00
" # Save information about the chosen path\n",
2025-06-03 13:43:48 +00:00
" with open(os.path.join(OUT_PATH, f\"plot_e2es_path_{name}.txt\"), \"w\") as f:\n",
" f.write(f\"Number of path instances: {len(relevant_dataflows)}\\n\")\n",
2025-06-06 08:24:47 +00:00
" f.write(\" \" + \"\\n -> \".join(relevant_path.split(\" -> \")))\n",
2025-06-03 13:43:48 +00:00
" f.write(\"\\n\")\n",
"\n",
2025-06-06 08:24:47 +00:00
" # Unique items for deduplicated analysis\n",
" conv_items_unique = set(flattened_breakdowns)\n",
2025-06-03 13:43:48 +00:00
"\n",
2025-06-06 08:24:47 +00:00
" ##################################################\n",
" # 3. Plot histograms of DDS/idle/cpu time breakdowns\n",
" ##################################################\n",
" def e2e_breakdown_type_hist(items):\n",
2025-06-03 13:43:48 +00:00
" \"\"\"\n",
2025-06-06 08:24:47 +00:00
" For all e2e breakdown items (unique) of the form `(\"<type>\", <duration>)`, plots a histogram for each encountered type (dds, idle, cpu).\n",
2025-06-03 13:43:48 +00:00
" \"\"\"\n",
" plot_types = (\"dds\", \"idle\", \"cpu\")\n",
2025-06-09 14:12:01 +00:00
" plt.close(f\"E2E type breakdown histograms {name}\\n{EXPERIMENT_NAME}\")\n",
" fig, axes = plt.subplots(1, 3, num=f\"E2E type breakdown histograms {name}\\n{EXPERIMENT_NAME}\", dpi=300, figsize=(16, 9))\n",
" fig.suptitle(f\"E2E Latency Breakdown by Resource Type {name}\\n{EXPERIMENT_NAME}\")\n",
2025-06-03 13:43:48 +00:00
"\n",
2025-06-06 08:24:47 +00:00
" for plot_type, ax in zip(plot_types, axes):\n",
" durations = [item.duration * 1000 for item in items if item.type == plot_type]\n",
2025-06-03 13:43:48 +00:00
" df = pd.Series(durations)\n",
2025-06-06 08:24:47 +00:00
" df.to_csv(\n",
" os.path.join(OUT_PATH, f\"plot_e2es_{plot_type}_portion_{name}.csv\"),\n",
" header=[f\"e2e_latency_{plot_type}_portion_ms\"],\n",
" index=False\n",
" )\n",
" ax.set_title(plot_type)\n",
2025-06-03 13:43:48 +00:00
" ax.hist(durations, bins=50)\n",
2025-06-06 08:24:47 +00:00
" ax.set_xlabel(\"Duration [ms]\")\n",
2025-06-09 14:12:01 +00:00
" ax.set_ylabel(\"Occurrences [#]\")\n",
2025-06-03 13:43:48 +00:00
" return fig\n",
"\n",
2025-06-06 08:24:47 +00:00
" fig = e2e_breakdown_type_hist(conv_items_unique)\n",
2025-06-03 13:43:48 +00:00
" plt.savefig(os.path.join(OUT_PATH, f\"plot_e2e_portions_{name}.png\"))\n",
"\n",
2025-06-03 13:43:48 +00:00
" ##################################################\n",
2025-06-06 08:24:47 +00:00
" # 4. Plot histogram of all E2E latencies observed\n",
2025-06-03 13:43:48 +00:00
" ##################################################\n",
2025-06-06 08:24:47 +00:00
" e2e_latencies = [(path[-1].timestamp - path[0].timestamp) * 1000 for path in relevant_dataflows]\n",
" pd.Series(e2e_latencies).to_csv(\n",
" os.path.join(OUT_PATH, f\"plot_e2es_{name}.csv\"),\n",
" index=False, header=[\"e2e_latency_ms\"]\n",
" )\n",
"\n",
2025-06-09 14:12:01 +00:00
" plt.close(f\"E2E histogram {name}\\n{EXPERIMENT_NAME}\")\n",
" fig, ax = plt.subplots(num=f\"E2E histogram {name}\\n{EXPERIMENT_NAME}\", dpi=300, figsize=(16, 9))\n",
" fig.suptitle(f\"E2E Latency Histogram {name}\\n{EXPERIMENT_NAME}\")\n",
2025-06-06 08:24:47 +00:00
" ax.hist(e2e_latencies, bins=30)\n",
" ax.set_xlabel(\"E2E Latency [ms]\")\n",
2025-06-09 14:12:01 +00:00
" ax.set_ylabel(\"Occurrences [#]\")\n",
2025-06-06 08:24:47 +00:00
" mean_latency = np.mean(e2e_latencies)\n",
" std_latency = np.std(e2e_latencies)\n",
" min_latency = np.min(e2e_latencies)\n",
" max_latency = np.max(e2e_latencies)\n",
2025-06-16 11:00:18 +00:00
" count_latencies = len(e2e_latencies)\n",
2025-06-06 08:24:47 +00:00
" ax.axvline(mean_latency, c=\"red\", linewidth=2)\n",
2025-06-03 13:43:48 +00:00
" _, max_ylim = ax.get_ylim()\n",
2025-06-06 08:24:47 +00:00
" # Create a multi-line string with all stats\n",
" stats_text = (\n",
" f\"Mean: {mean_latency:.2f} ms\\n\"\n",
" f\"Std: {std_latency:.2f} ms\\n\"\n",
" f\"Min: {min_latency:.2f} ms\\n\"\n",
2025-06-16 11:00:18 +00:00
" f\"Max: {max_latency:.2f} ms\\n\"\n",
" f\"Count: {count_latencies}\"\n",
2025-06-06 08:24:47 +00:00
" )\n",
" # Place text near top right of plot\n",
" ax.text(\n",
" mean_latency * 1.02, # or just use ax.get_xlim()[1]*0.98 for far right\n",
" max_ylim * 0.98, # near the top\n",
" stats_text,\n",
" verticalalignment='top',\n",
" horizontalalignment='left',\n",
" fontsize=10,\n",
" bbox=dict(facecolor='white', alpha=0.7, boxstyle='round,pad=0.3')\n",
" )\n",
" plt.savefig(os.path.join(OUT_PATH, f\"plot_e2es_{name}.png\"))\n",
2025-06-16 11:00:18 +00:00
" result_strings.append(f\"Chain {topics[0]} --> {topics[-1]} E2E stats: Mean: {mean_latency:.2f} ms, Std: {std_latency:.2f} ms, Min: {min_latency:.2f} ms, Max: {max_latency:.2f} ms, Count: {count_latencies}\")\n",
2025-06-10 14:36:56 +00:00
" # also do it as csv of order: exepriment_name, chain, mean, std, min, max\n",
" result_strings_csv.append(\n",
2025-06-16 11:00:18 +00:00
" f\"{EXPERIMENT_NAME},{topics[0]} --> {topics[-1]},{mean_latency:.2f},{std_latency:.2f},{min_latency:.2f},{max_latency:.2f},{count_latencies}\"\n",
2025-06-10 14:36:56 +00:00
" )\n",
"\n",
2025-06-06 08:24:47 +00:00
" ##################################################\n",
" # 5. Violin plot of per-component/stage latencies on the DFG path (across all dataflows)\n",
" ##################################################\n",
" plt.close(\"E2E path breakdown\")\n",
2025-06-09 14:12:01 +00:00
" fig, ax = plt.subplots(num=f\"E2E path breakdown {name}\\n{EXPERIMENT_NAME}\", dpi=300, figsize=(16, 5))\n",
" fig.suptitle(f\"E2E Latency Path Breakdown {name}\\n{EXPERIMENT_NAME}\")\n",
2025-06-06 08:24:47 +00:00
"\n",
" component_breakdowns = [e2e_latency_breakdown(p) for p in tqdm(relevant_dataflows, desc=\"Calculating breakdowns\")]\n",
" # Transpose: Each element in component_durations is all values for one component\n",
" component_durations = list(zip(*component_breakdowns))\n",
" component_latency_items = component_durations\n",
"\n",
" def extract_class_name(symbol: str) -> str:\n",
" # Try to extract class name in pattern: void(ClassName::ClassName...\n",
" m = re.search(r'void\\((\\w+)::', symbol)\n",
" if m:\n",
" return m.group(1)\n",
" # If pattern not found, fallback: find last word before first '('\n",
" m = re.search(r'(\\w+)\\s*\\(', symbol)\n",
" if m:\n",
" return m.group(1)\n",
" return symbol # fallback to original if all fails\n",
"\n",
" # For labeling, types etc (assume first dataflow is representative)\n",
" labels = [extract_class_name(label_latency_item(item)) for item in e2e_latency_breakdown(relevant_dataflows[0])]\n",
" print(f\"Labels: {labels}\")\n",
" types = [item.type for item in e2e_latency_breakdown(relevant_dataflows[0])]\n",
" # Convert durations from seconds to ms\n",
" component_durations_ms = [list(map(lambda item: item.duration * 1000, d)) for d in component_durations]\n",
"\n",
" legend_entries = []\n",
"\n",
" def add_label(violin, label):\n",
" color = violin[\"bodies\"][0].get_facecolor().flatten()\n",
" legend_entries.append((mpatches.Patch(color=color), label))\n",
"\n",
2025-06-09 14:12:01 +00:00
" #counter = Counter(types)\n",
" ## Print all unique elements and their count\n",
" #for element, count in counter.items():\n",
" # print(f'{element}: {count}')\n",
"\n",
2025-06-06 08:24:47 +00:00
" for comp_type in (\"idle\", \"dds\", \"cpu\"):\n",
" indices = [i for i, t in enumerate(types) if t == comp_type]\n",
" xs = [component_durations_ms[i] for i in indices]\n",
" if not xs:\n",
" continue\n",
" vln = ax.violinplot(xs, indices)\n",
" add_label(vln, comp_type)\n",
" # Export CSVs per component\n",
" for i, x in zip(indices, xs):\n",
" pd.Series(x).to_csv(\n",
" os.path.join(OUT_PATH, f\"plot_e2es_violin_{i:02d}.csv\"),\n",
" index=False, header=[\"duration_ms\"]\n",
" )\n",
" ax.set_ylabel(\"Latency contribution [ms]\")\n",
" ax.set_xticks(range(len(labels)), labels, rotation=90)\n",
" ax.legend(*zip(*legend_entries))\n",
" plt.savefig(os.path.join(OUT_PATH, f\"plot_e2es_violin_{name}.png\"))\n",
"\n",
" # Save labels and types\n",
" pd.Series(labels).to_csv(os.path.join(OUT_PATH, f\"plot_e2es_violin_labels_{name}.csv\"), index=False, header=[\"label\"])\n",
" pd.Series(types).to_csv(os.path.join(OUT_PATH, f\"plot_e2es_violin_types_{name}.csv\"), index=False, header=[\"type\"])\n",
"\n",
2025-06-06 08:24:47 +00:00
" ##################################################\n",
" # 6. Per-callback timestamp/duration records for relevant components (e.g., \"cpu\" stages)\n",
" ##################################################\n",
" for concat_pc_items in component_latency_items:\n",
" # Defensive check\n",
" if not isinstance(concat_pc_items[0].location[0], TrPublishInstance):\n",
" continue\n",
" records = [(item.location[0].timestamp, item.duration) for item in concat_pc_items]\n",
" pd.DataFrame(records, columns=(\"timestamp\", \"duration\")).to_csv(\n",
" os.path.join(OUT_PATH, f\"dur_ts_{concat_pc_items[0].location[0].publisher.topic_name.replace('/', '__')}_{name}.csv\"),\n",
" index=False\n",
" )\n",
"\n",
2025-06-06 08:24:47 +00:00
" ##################################################\n",
" # 7. Per-callback timing stats (mean, std, min, max) for \"cpu\" stages\n",
" ##################################################\n",
" cpu_indices = [i for i, t in enumerate(types) if t == \"cpu\"]\n",
" cpu_durations = [np.array([item.location[0].duration for item in component_latency_items[i]]) for i in cpu_indices]\n",
" cpu_labels = [labels[i] for i in cpu_indices]\n",
" stats = [(lbl, x.mean(), x.std(), x.min(), x.max()) for x, lbl in zip(cpu_durations, cpu_labels)]\n",
" pd.DataFrame(stats, columns=[\"callback\", \"mean\", \"std\", \"min\", \"max\"]).to_csv(\n",
" os.path.join(OUT_PATH, f\"calc_times_{name}.csv\"), index=False\n",
" )\n",
"\n",
2025-06-06 08:24:47 +00:00
" ##################################################\n",
" # 8. Stacked breakdown plot (from message_tree plots)\n",
" ##################################################\n",
2025-06-09 14:12:01 +00:00
" fig = e2e_breakdown_stack(*e2e_breakdowns, name=f\"{name}\\n{EXPERIMENT_NAME}\")\n",
2025-06-06 08:24:47 +00:00
" fig.set_size_inches(16, 9)\n",
" fig.set_dpi(300)\n",
"\n",
2025-06-10 14:36:56 +00:00
"for result_string in result_strings:\n",
" print(result_string)\n",
"\n",
"# append csv results to results file one dir up\n",
"with open(os.path.join(OUT_PATH, \"..\", \"..\", \"results.csv\"), \"a\") as f:\n",
" f.write(\"\\n\".join(result_strings_csv) + \"\\n\")\n",
"\n",
2023-02-15 17:02:20 +09:00
"print(\"Done.\")"
]
}
],
"metadata": {
"kernelspec": {
2025-06-03 13:43:48 +00:00
"display_name": "venv310",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
2025-06-03 13:43:48 +00:00
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 2
2022-09-20 12:44:04 +02:00
}