{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import os\n", "import sys\n", "import re\n", "from tqdm import tqdm\n", "from typing import Iterable\n", "\n", "import numpy as np\n", "import pandas as pd\n", "from matplotlib import pyplot as plt\n", "\n", "from misc.utils import cached, parse_as\n", "\n", "# TODO: This has no effect for some reason (goal is to make graphs in notebook large and hi-res)\n", "plt.rcParams['figure.dpi'] = 300\n", "\n", "%matplotlib inline" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "# User Settings" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "##################################################\n", "# User Settings\n", "##################################################\n", "# Change these to influence the execution of the\n", "# notebook.\n", "# You can override these from the command line\n", "# by defining environment variables with the\n", "# name of the constants below, prefixed with\n", "# \"ANA_NB_\".\n", "# For example, the environment variable\n", "# \"ANA_NB_TR_PATH\" will override the \"TR_PATH\"\n", "# setting below.\n", "##################################################\n", "\n", "# The path to the build folder of a ROS2 workspace that contains the\n", "# tracetools_read and tracetools_analysis folders.\n", "TRACING_WS_BUILD_PATH = \"/home/niklas/dataflow-analysis/dependencies/build\"\n", "\n", "# The name of the experiment that is being analyzed.\n", "# This will be used to name output directory.\n", "EXPERIMENT_NAME = \"edf_multi_timed_20-20250610091402\"\n", "\n", "# Path to trace directory (e.g. ~/.ros/my-trace/ust) or to a converted trace file.\n", "# Using the path \"/ust\" at the end is optional but greatly reduces processing time\n", "# if kernel traces are also present.\n", "TR_PATH = f\"/home/niklas/dataflow-analysis/traces/{EXPERIMENT_NAME}/ust\"\n", "\n", "# Path to the folder all artifacts from this notebook are saved to.\n", "# This entails plots as well as data tables.\n", "OUT_PATH = f\"out/{EXPERIMENT_NAME}/\"\n", "\n", "# Whether to cache the results of long computations per set of inputs\n", "CACHING_ENABLED = False\n", "\n", "# Whether to annotate topics/publications with bandwidth/message size\n", "BW_ENABLED = False\n", "# Path to a HDF5 file as output by ma-hw-perf-tools/messages/record.bash\n", "# Used to annotate message sizes in E2E latency calculations\n", "BW_PATH = \"path/to/messages.h5\"\n", "\n", "# Whether to use dependencies extracted by the Clang-tools to supplement\n", "# automatic node-internal data flow annotations.\n", "# If in doubt, set to False.\n", "CL_ENABLED = False\n", "# Path to the output directory of the ROS2 dependency checker.\n", "# Will only be used if CL_ENABLED is True.\n", "CL_PATH = \"/path/to/code_analysis/output\"\n", "\n", "# Whether to compute data flow graphs.\n", "# If you are only interested in E2E latencies, set this to False\n", "DFG_ENABLED = True\n", "# Whether to plot data flow graphs (ignored if DFG_ENABLED is False)\n", "DFG_PLOT = True\n", "\n", "# The maximum node namespace hierarchy level to be plotted.\n", "# Top-level (1): e.g. /sensing, /control, etc.\n", "# Level 3: e.g. /sensing/lidar/pointcloud_processor\n", "DFG_MAX_HIER_LEVEL = 100\n", "\n", "# RegEx pattern for nodes that shall be marked as system inputs\n", "# These will be plotted with a start arrow as known from automata diagrams\n", "DFG_INPUT_NODE_PATTERNS = [r\"^/input_\"]\n", "# RegEx pattern for nodes that shall be marked as system outputs\n", "# These will be plotted with a double border\n", "DFG_OUTPUT_NODE_PATTERNS = [r\"^/output_\"]\n", "# RegEx for nodes which shall not be plotted in the DFG\n", "DFG_EXCL_NODE_PATTERNS = [r\"^/rviz2\"]\n", "\n", "# Whether to compute E2E latencies.\n", "E2E_ENABLED = True\n", "# Whether to plot end-to-end latency information (ignored if E2E_ENABLED is False)\n", "E2E_PLOT = True\n", "# The index of the output message that shall be used in plots that visualize a specific\n", "# message dependency tree. This index has to be 0 <= n < #output messages\n", "E2E_PLOT_TIMESTAMP = 5\n", "# E2E latency threshold. Every E2E latency higher than this is discarded.\n", "# Set this as low as comfortably possible to speed up calculations.\n", "# WARNING: If you set this too low (i.e. to E2E latencies that plausibly can happen)\n", "# your results will be wrong)\n", "E2E_TIME_LIMIT_S = 1000\n", "\n", "# All topics containing any of these RegEx patterns are considered output topics in E2E latency calculations\n", "# E.g. r\"^/control/\" will cover all control topics\n", "E2E_OUTPUT_TOPIC_PATTERNS = [r\"^/output/\"]\n", "# All topics containing any of these RegEx patterns are considered input topics in E2E latency calculations\n", "# E.g. r\"^/sensing/\" will cover all sensing topics\n", "E2E_INPUT_TOPIC_PATTERNS = [r\"^/input/\"]\n", "\n", "# E2E paths are uniquely identified by a string like \"/topic/1 -> void(Node1)(args1) -> /topic/2 -> void(Node2)(args2) -> void(Node2)(args3) -> ...\".\n", "# Certain patterns only occur in initial setup or in scenario switching and can be excluded via RegEx patterns here.\n", "E2E_EXCL_PATH_PATTERNS = [r\"^/parameter_events\"]\n", "\n", "# To specify paths of interest, topic/callback name patterns that HAVE TO OCCUR in each E2E path can be specified as RegEx here.\n", "#E2E_INCL_PATH_PATTERNS = [\"BehaviorPathPlanner\", \"BehaviorVelocityPlanner\", \"pointcloud_preprocessor::Filter\", r\"^/sensing/.*?pointcloud\"]\n", "E2E_INCL_PATH_PATTERNS = []\n", "\n", "# If an exact path through the system is known, this variabe can be set to a list (order matters!) of all elements on the path.\n", "# The first item ist the one at the \"input end\" of the system, the last one the \"output end\".\n", "E2E_EXACT_PATH = []\n", "\n", "# For development purposes only. Leave this at False.\n", "DEBUG = False\n", "\n", "# For development purposes only. Leave this at False.\n", "MANUAL_CACHE = False\n", "\n", "##################################################\n", "# End of User Settings\n", "##################################################\n", "\n", "# This code overrides the above constants with environment variables, with values from the environment.\n", "# Values will be parsed through Python's literal_eval() function. Thus, strings have to be specified\n", "# including \" or '.\n", "for env_key, env_value in os.environ.items():\n", " if env_key.startswith(\"ANA_NB_\"):\n", " key = env_key.removeprefix(\"ANA_NB_\")\n", " if key not in globals().keys():\n", " continue\n", " value = parse_as(type(globals()[key]), env_value)\n", " globals()[key] = value\n", "\n", "\n", "# Convert input paths to absolute paths\n", "def _expand_path(path):\n", " return os.path.realpath(os.path.expandvars(os.path.expanduser(path)))\n", "\n", "\n", "TRACING_WS_BUILD_PATH = _expand_path(TRACING_WS_BUILD_PATH)\n", "TR_PATH = _expand_path(TR_PATH)\n", "OUT_PATH = _expand_path(OUT_PATH)\n", "BW_PATH = _expand_path(BW_PATH)\n", "CL_PATH = _expand_path(CL_PATH)\n", "\n", "os.makedirs(OUT_PATH, exist_ok=True)\n", "\n", "# Print parsed user settings\n", "print(\"User Settings:\")\n", "for k, v in globals().copy().items():\n", " if not k.isupper():\n", " continue\n", " if isinstance(v, Iterable) and not isinstance(v, str):\n", " v = (\"\\n \" + (\" \" * 44)).join(list(map(str, v)))\n", " print(f\" {k:.<40s} := {v}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "# The last few imports can only be resolved using the user-specified paths above\n", "sys.path.append(os.path.join(TRACING_WS_BUILD_PATH, \"tracetools_read/\"))\n", "sys.path.append(os.path.join(TRACING_WS_BUILD_PATH, \"tracetools_analysis/\"))\n", "# noinspection PyUnresolvedReferences\n", "from tracetools_read.trace import *\n", "# noinspection PyUnresolvedReferences\n", "from tracetools_analysis.loading import load_file\n", "# noinspection PyUnresolvedReferences\n", "from tracetools_analysis.processor.ros2 import Ros2Handler\n", "\n", "from tracing_interop.tr_types import *" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "# Load Trace Data\n", "\n", "Load (and, if necessary, convert) tracing data obtained through ros2_tracing. Build indices for fast analysis." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def _load_traces():\n", " file = load_file(TR_PATH)\n", " handler = Ros2Handler.process(file)\n", " return TrContext(handler)\n", "\n", "\n", "_tracing_context = cached(\"tr_objects\", _load_traces, [TR_PATH], not CACHING_ENABLED)\n", "_tr_globals = [\"nodes\", \"publishers\", \"subscriptions\", \"timers\", \"timer_node_links\", \"subscription_objects\",\n", " \"callback_objects\", \"callback_symbols\", \"publish_instances\", \"callback_instances\", \"topics\"]\n", "\n", "# Help the IDE recognize those identifiers by assigning a dummy value to their name.\n", "nodes: Index = Index([])\n", "publishers: Index = Index([])\n", "subscriptions: Index = Index([])\n", "timers: Index = Index([])\n", "timer_node_links: Index = Index([])\n", "subscription_objects: Index = Index([])\n", "callback_objects: Index = Index([])\n", "callback_symbols: Index = Index([])\n", "publish_instances: Index = Index([])\n", "callback_instances: Index = Index([])\n", "topics: Index = Index([])\n", "\n", "for name in _tr_globals:\n", " globals()[name] = getattr(_tracing_context, name)\n", "\n", "print(\"Done.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "##################################################\n", "# Print (All/Input/Output) Topic Message Counts\n", "##################################################\n", "\n", "for topic in sorted(topics, key=lambda t: t.name):\n", " topic: TrTopic\n", " print(f\"{topic.name:.<130s} | {sum(map(lambda p: len(p.instances), topic.publishers)):>5d} msgs\")\n", "\n", "print(\"\\n[DEBUG] INPUT TOPICS\")\n", "for t in sorted(topics, key=lambda t: t.name):\n", " for f in E2E_INPUT_TOPIC_PATTERNS:\n", " if re.search(f, t.name):\n", " print(f\"--[DEBUG] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers)):>5d} msgs\")\n", "\n", "print(\"\\n[DEBUG] OUTPUT TOPICS\")\n", "for t in sorted(topics, key=lambda t: t.name):\n", " for f in E2E_OUTPUT_TOPIC_PATTERNS:\n", " if re.search(f, t.name):\n", " print(f\"--[DEBUG] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers)):>5d} msgs\")" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "# Analyze ROS Graph\n", "Reconstruct namespace hierarchy, data flow graph between callbacks." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import latency_graph.latency_graph_structure as lg\n", "\n", "\n", "def _make_latency_graph():\n", " return lg.LatencyGraph(_tracing_context)\n", "\n", "\n", "lat_graph = cached(\"lat_graph\", _make_latency_graph, [TR_PATH], not CACHING_ENABLED)" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "## Plot ROS Graph (full)\n", "Plot the DFG hierarchically by node namespace. Plot each internal and external dependency between callbacks as one arrow." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%skip_if_false DFG_ENABLED\n", "%%skip_if_false DFG_PLOT\n", "\n", "#################################################\n", "# Plot full DFG, down to the callback level\n", "#################################################\n", "\n", "from latency_graph.latency_graph_plots import plot_latency_graph_full\n", "\n", "plot_latency_graph_full(lat_graph, _tracing_context, os.path.join(OUT_PATH, \"latency_graph_full\"))" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "## Plot Latency Graph (overview)\n", "Plot the DFG down to a certain hierarchy level in a flattened manner. Aggregate dependencies from multiple callbacks into corresponding node-node dependencies." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "pycharm": { "name": "#%%%%skip_if_false DFG_ENABLED\n" } }, "outputs": [], "source": [ "%%skip_if_false DFG_ENABLED\n", "%%skip_if_false DFG_PLOT\n", "\n", "#################################################\n", "# Plot overview of DFG, down to a certain\n", "# hierarchy level\n", "#################################################\n", "\n", "from latency_graph.latency_graph_plots import plot_latency_graph_overview\n", "\n", "print(f\"Plotting DFG overview with max hierarchy level {DFG_MAX_HIER_LEVEL}...\")\n", "print(f\"Input Node Patterns: {DFG_INPUT_NODE_PATTERNS}\")\n", "print(f\"Output Node Patterns: {DFG_OUTPUT_NODE_PATTERNS}\")\n", "print(f\"Excluded Node Patterns: {DFG_EXCL_NODE_PATTERNS}\")\n", "\n", "plot_latency_graph_overview(lat_graph, DFG_EXCL_NODE_PATTERNS, DFG_INPUT_NODE_PATTERNS, DFG_OUTPUT_NODE_PATTERNS, DFG_MAX_HIER_LEVEL, os.path.join(OUT_PATH, \"latency_graph_overview\"))" ] }, { "cell_type": "markdown", "metadata": { "collapsed": false }, "source": [ "# Analyze Message Flow\n", "Build dependency trees ending in the specified output topics." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%skip_if_false E2E_ENABLED\n", "\n", "from message_tree.message_tree_algorithms import build_dep_trees\n", "\n", "end_topics = [t for t in _tracing_context.topics if any(re.search(f, t.name) for f in E2E_OUTPUT_TOPIC_PATTERNS)]\n", "\n", "print(f\"Found {len(end_topics)} end topics for E2E latency calculations: {', '.join(t.name for t in end_topics)}\")\n", "\n", "print(f\"Using {len(_tracing_context.callback_objects)} callback objects, {len(_tracing_context.callback_instances)} callback instances, and {len(_tracing_context.publish_instances)} publish instances for E2E latency calculations.\")\n", "\n", "def _build_dep_trees():\n", " return build_dep_trees(end_topics, lat_graph, _tracing_context, E2E_EXCL_PATH_PATTERNS, E2E_TIME_LIMIT_S)\n", " #return build_dep_trees(end_topics, lat_graph, _tracing_context, E2E_EXCL_PATH_PATTERNS, E2E_TIME_LIMIT_S, exact_path=E2E_EXACT_PATH)\n", "\n", "try:\n", " trees = cached(\"trees\", _build_dep_trees, [TR_PATH], not CACHING_ENABLED)\n", "except Exception as e:\n", " import traceback\n", " print(e)\n", " traceback.print_exc()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%skip_if_false E2E_ENABLED\n", "%%skip_if_false BW_ENABLED\n", "\n", "##################################################\n", "# Get message bandwidths from `ros2 multitopic bw`\n", "# output.\n", "##################################################\n", "\n", "from bw_interop.process_bw_output import get_topic_messages\n", "msgs = get_topic_messages(BW_PATH)\n", "\n", "from bw_interop.bw_plots import dds_lat_msg_size_scatter\n", "plot_topic = \"\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%skip_if_false E2E_ENABLED\n", "\n", "from message_tree.message_tree_algorithms import e2e_paths_sorted_desc\n", "from message_tree.message_tree_algorithms import owner\n", "from message_tree.message_tree_structure import DepTree\n", "\n", "# set of seen publush topics to avoid duplicates\n", "seen_publish_topics = set()\n", "\n", "def _print_elem(elem, indent=0):\n", " if isinstance(elem, TrTopic):\n", " print(f\"{' ' * indent}Topic: {elem.name}\")\n", " elif isinstance(elem, TrCallbackObject):\n", " print(f\"{' ' * indent}Callback: {elem.name} ({elem.elem.name})\")\n", " elif isinstance(elem, TrPublishInstance):\n", " #if elem.publisher.topic_name in seen_publish_topics:\n", " # print(f\"{' ' * indent}Publish Instance: {elem.publisher.topic_name} (already seen)\")\n", " # return False\n", " #seen_publish_topics.add(elem.publisher.topic_name)\n", " print(f\"{' ' * indent}Publish Instance: {elem.publisher.topic_name}\")\n", " elif isinstance(elem, TrCallbackInstance):\n", " print(f\"{' ' * indent}Callback Instance: {elem.callback_obj.callback_symbol.symbol}\")\n", " else:\n", " print(f\"{' ' * indent}Unknown element type: {type(elem)}\")\n", " return True\n", "\n", "def _traverse_print_tree(tree, indent=0):\n", " \"\"\"Recursively traverse a dependency tree and print its structure.\"\"\"\n", " for i, node in enumerate(tree):\n", " if isinstance(node, DepTree):\n", " if not _print_elem(node.head, indent):\n", " continue\n", " _traverse_print_tree(node.deps, indent + 2)\n", " elif isinstance(node, list):\n", " for entry in node:\n", " if entry is not None and isinstance(entry, DepTree):\n", " if not _print_elem(entry.head, indent):\n", " continue\n", " _traverse_print_tree(entry.deps, indent + 2)\n", " else:\n", " print(f\"{' ' * indent}List of {len(node)} elements, but not a DepTree.\")\n", " else:\n", " if not _print_elem(node, indent):\n", " continue\n", "\n", "def _print_trees():\n", " print(\"Dependency Trees:\")\n", " for i, tree in enumerate(trees):\n", " print(f\"\\nTree {i + 1}/{len(trees)}:\")\n", " _traverse_print_tree(tree)\n", " print(\"\\nEnd of Dependency Trees\\n\")\n", " print(f\"Published topics seen so far: {len(seen_publish_topics)}:, {', '.join(seen_publish_topics)}\")\n", "#_print_trees()\n", "\n", "\n", "##################################################\n", "# Find and filter relevant E2E paths in trees\n", "##################################################\n", "trees_paths = [e2e_paths_sorted_desc(tree, E2E_INPUT_TOPIC_PATTERNS) for tree in tqdm(trees, mininterval=10.0, desc=\"Extracting E2E paths\")]\n", "all_paths = [p for paths in trees_paths for p in paths]\n", "\n", "def _print_paths():\n", " for i, path in enumerate(all_paths):\n", " print(f\"Path {i+1}/{len(all_paths)}:\")\n", " for j, elem in enumerate(path):\n", " if isinstance(elem, TrTopic):\n", " print(f\" {j+1:>3d}: Topic: {elem.name}\")\n", " elif isinstance(elem, TrCallbackObject):\n", " print(f\" {j+1:>3d}: Callback: {elem.name} ({elem.node.name})\")\n", " elif isinstance(elem, TrPublishInstance):\n", " print(f\" {j+1:>3d}: Publish Instance: {elem.publisher.topic_name}\")\n", " elif isinstance(elem, TrCallbackInstance):\n", " print(f\" {j+1:>3d}: Callback Instance: {elem.callback_obj.callback_symbol.symbol}\")\n", " else:\n", " print(f\" {j+1:>3d}: Unknown element type: {elem}\")\n", " print()\n", "#_print_paths()\n", "\n", "print(f\"Found {len(all_paths)} E2E paths in total.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "%%skip_if_false E2E_ENABLED\n", "\n", "from message_tree.message_tree_algorithms import aggregate_e2e_paths\n", "\n", "##################################################\n", "# Group dataflows by DFG path\n", "##################################################\n", "\n", "cohorts = aggregate_e2e_paths(all_paths)\n", "print(f\"Found {len(cohorts)} cohorts of E2E paths.\")\n", "cohort_pairs = [(k, v) for k, v in cohorts.items()]\n", "cohort_pairs.sort(key=lambda kv: len(kv[1]), reverse=True)\n", "\n", "path_records = [{\"path\": path_key,\n", " \"timestamp\": path[-1].timestamp,\n", " \"e2e_latency\": path[-1].timestamp - path[0].timestamp} \\\n", " for path_key, paths in cohort_pairs for path in paths if path]\n", "\n", "out_df = pd.DataFrame.from_records(path_records)\n", "out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\", index=False)\n", "\n", "df_print = out_df[['path', 'e2e_latency']].groupby(\"path\").agg(['count', 'mean', 'min', 'max']).reset_index()\n", "#df_print['path'] = df_print['path'].apply(lambda path: \" -> \".join(filter(lambda part: part.startswith(\"/\"), path.split(\" -> \"))))\n", "df_print = df_print.sort_values((\"e2e_latency\", \"count\"), ascending=False)\n", "df_print.to_csv(os.path.join(OUT_PATH, \"e2e_overview.csv\"), sep=\"\\t\", index=False)\n", "df_print" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import pickle\n", "# with open(\"state.pkl\", \"wb\") as f:\n", "# pickle.dump((trees_paths, all_paths, cohorts), f)\n", "with open(os.path.join(OUT_PATH, \"state.pkl\"), \"wb\") as f:\n", " pickle.dump((trees_paths, all_paths, cohorts), f)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import matplotlib.patches as mpatches\n", "from collections import Counter\n", "\n", "from message_tree.message_tree_algorithms import e2e_latency_breakdown, label_latency_item\n", "from message_tree.message_tree_plots import e2e_breakdown_stack\n", "\n", "##################################################\n", "# 1. Filter DFG paths through must-be-included and cannot-be-included patterns\n", "##################################################\n", "\n", "cohorts_filt = {\n", " k: v for k, v in cohorts.items()\n", " if all(re.search(f.removeprefix(\"^\").removesuffix(\"$\"), k) for f in E2E_INCL_PATH_PATTERNS)\n", "}\n", "\n", "##################################################\n", "# 2. Iterate through filtered DFG paths\n", "##################################################\n", "\n", "result_strings = []\n", "result_strings_csv = []\n", "\n", "for relevant_path, relevant_dataflows in cohorts_filt.items():\n", " # Construct a filesystem-safe name for output files\n", " topics = [topic for topic in relevant_path.split(\" -> \") if topic.startswith(\"/\") and not topic.startswith(\"/void\")]\n", " name = f\"{topics[0][1:]}-{topics[-1][1:]}\"\n", " name = name.replace(\"/\", \"_\").replace(\" \", \"_\").replace(\":\", \"_\")\n", "\n", " # Break down E2E latency for all dataflows on this path\n", " e2e_breakdowns = list(map(e2e_latency_breakdown, relevant_dataflows))\n", " flattened_breakdowns = [item for breakdown in e2e_breakdowns for item in breakdown]\n", "\n", " # Save information about the chosen path\n", " with open(os.path.join(OUT_PATH, f\"plot_e2es_path_{name}.txt\"), \"w\") as f:\n", " f.write(f\"Number of path instances: {len(relevant_dataflows)}\\n\")\n", " f.write(\" \" + \"\\n -> \".join(relevant_path.split(\" -> \")))\n", " f.write(\"\\n\")\n", "\n", " # Unique items for deduplicated analysis\n", " conv_items_unique = set(flattened_breakdowns)\n", "\n", " ##################################################\n", " # 3. Plot histograms of DDS/idle/cpu time breakdowns\n", " ##################################################\n", " def e2e_breakdown_type_hist(items):\n", " \"\"\"\n", " For all e2e breakdown items (unique) of the form `(\"\", )`, plots a histogram for each encountered type (dds, idle, cpu).\n", " \"\"\"\n", " plot_types = (\"dds\", \"idle\", \"cpu\")\n", " plt.close(f\"E2E type breakdown histograms {name}\\n{EXPERIMENT_NAME}\")\n", " fig, axes = plt.subplots(1, 3, num=f\"E2E type breakdown histograms {name}\\n{EXPERIMENT_NAME}\", dpi=300, figsize=(16, 9))\n", " fig.suptitle(f\"E2E Latency Breakdown by Resource Type {name}\\n{EXPERIMENT_NAME}\")\n", "\n", " for plot_type, ax in zip(plot_types, axes):\n", " durations = [item.duration * 1000 for item in items if item.type == plot_type]\n", " df = pd.Series(durations)\n", " df.to_csv(\n", " os.path.join(OUT_PATH, f\"plot_e2es_{plot_type}_portion_{name}.csv\"),\n", " header=[f\"e2e_latency_{plot_type}_portion_ms\"],\n", " index=False\n", " )\n", " ax.set_title(plot_type)\n", " ax.hist(durations, bins=50)\n", " ax.set_xlabel(\"Duration [ms]\")\n", " ax.set_ylabel(\"Occurrences [#]\")\n", " return fig\n", "\n", " fig = e2e_breakdown_type_hist(conv_items_unique)\n", " plt.savefig(os.path.join(OUT_PATH, f\"plot_e2e_portions_{name}.png\"))\n", "\n", " ##################################################\n", " # 4. Plot histogram of all E2E latencies observed\n", " ##################################################\n", " e2e_latencies = [(path[-1].timestamp - path[0].timestamp) * 1000 for path in relevant_dataflows]\n", " pd.Series(e2e_latencies).to_csv(\n", " os.path.join(OUT_PATH, f\"plot_e2es_{name}.csv\"),\n", " index=False, header=[\"e2e_latency_ms\"]\n", " )\n", "\n", " plt.close(f\"E2E histogram {name}\\n{EXPERIMENT_NAME}\")\n", " fig, ax = plt.subplots(num=f\"E2E histogram {name}\\n{EXPERIMENT_NAME}\", dpi=300, figsize=(16, 9))\n", " fig.suptitle(f\"E2E Latency Histogram {name}\\n{EXPERIMENT_NAME}\")\n", " ax.hist(e2e_latencies, bins=30)\n", " ax.set_xlabel(\"E2E Latency [ms]\")\n", " ax.set_ylabel(\"Occurrences [#]\")\n", " mean_latency = np.mean(e2e_latencies)\n", " std_latency = np.std(e2e_latencies)\n", " min_latency = np.min(e2e_latencies)\n", " max_latency = np.max(e2e_latencies)\n", " ax.axvline(mean_latency, c=\"red\", linewidth=2)\n", " _, max_ylim = ax.get_ylim()\n", " # Create a multi-line string with all stats\n", " stats_text = (\n", " f\"Mean: {mean_latency:.2f} ms\\n\"\n", " f\"Std: {std_latency:.2f} ms\\n\"\n", " f\"Min: {min_latency:.2f} ms\\n\"\n", " f\"Max: {max_latency:.2f} ms\"\n", " )\n", " # Place text near top right of plot\n", " ax.text(\n", " mean_latency * 1.02, # or just use ax.get_xlim()[1]*0.98 for far right\n", " max_ylim * 0.98, # near the top\n", " stats_text,\n", " verticalalignment='top',\n", " horizontalalignment='left',\n", " fontsize=10,\n", " bbox=dict(facecolor='white', alpha=0.7, boxstyle='round,pad=0.3')\n", " )\n", " plt.savefig(os.path.join(OUT_PATH, f\"plot_e2es_{name}.png\"))\n", " result_strings.append(f\"Chain {topics[0]} --> {topics[-1]} E2E stats: Mean: {mean_latency:.2f} ms, Std: {std_latency:.2f} ms, Min: {min_latency:.2f} ms, Max: {max_latency:.2f} ms\")\n", " # also do it as csv of order: exepriment_name, chain, mean, std, min, max\n", " result_strings_csv.append(\n", " f\"{EXPERIMENT_NAME},{topics[0]} --> {topics[-1]},{mean_latency:.2f},{std_latency:.2f},{min_latency:.2f},{max_latency:.2f}\"\n", " )\n", "\n", " ##################################################\n", " # 5. Violin plot of per-component/stage latencies on the DFG path (across all dataflows)\n", " ##################################################\n", " plt.close(\"E2E path breakdown\")\n", " fig, ax = plt.subplots(num=f\"E2E path breakdown {name}\\n{EXPERIMENT_NAME}\", dpi=300, figsize=(16, 5))\n", " fig.suptitle(f\"E2E Latency Path Breakdown {name}\\n{EXPERIMENT_NAME}\")\n", "\n", " component_breakdowns = [e2e_latency_breakdown(p) for p in tqdm(relevant_dataflows, desc=\"Calculating breakdowns\")]\n", " # Transpose: Each element in component_durations is all values for one component\n", " component_durations = list(zip(*component_breakdowns))\n", " component_latency_items = component_durations\n", "\n", " def extract_class_name(symbol: str) -> str:\n", " # Try to extract class name in pattern: void(ClassName::ClassName...\n", " m = re.search(r'void\\((\\w+)::', symbol)\n", " if m:\n", " return m.group(1)\n", " # If pattern not found, fallback: find last word before first '('\n", " m = re.search(r'(\\w+)\\s*\\(', symbol)\n", " if m:\n", " return m.group(1)\n", " return symbol # fallback to original if all fails\n", "\n", " # For labeling, types etc (assume first dataflow is representative)\n", " labels = [extract_class_name(label_latency_item(item)) for item in e2e_latency_breakdown(relevant_dataflows[0])]\n", " print(f\"Labels: {labels}\")\n", " types = [item.type for item in e2e_latency_breakdown(relevant_dataflows[0])]\n", " # Convert durations from seconds to ms\n", " component_durations_ms = [list(map(lambda item: item.duration * 1000, d)) for d in component_durations]\n", "\n", " legend_entries = []\n", "\n", " def add_label(violin, label):\n", " color = violin[\"bodies\"][0].get_facecolor().flatten()\n", " legend_entries.append((mpatches.Patch(color=color), label))\n", "\n", " #counter = Counter(types)\n", " ## Print all unique elements and their count\n", " #for element, count in counter.items():\n", " # print(f'{element}: {count}')\n", "\n", " for comp_type in (\"idle\", \"dds\", \"cpu\"):\n", " indices = [i for i, t in enumerate(types) if t == comp_type]\n", " xs = [component_durations_ms[i] for i in indices]\n", " if not xs:\n", " continue\n", " vln = ax.violinplot(xs, indices)\n", " add_label(vln, comp_type)\n", " # Export CSVs per component\n", " for i, x in zip(indices, xs):\n", " pd.Series(x).to_csv(\n", " os.path.join(OUT_PATH, f\"plot_e2es_violin_{i:02d}.csv\"),\n", " index=False, header=[\"duration_ms\"]\n", " )\n", " ax.set_ylabel(\"Latency contribution [ms]\")\n", " ax.set_xticks(range(len(labels)), labels, rotation=90)\n", " ax.legend(*zip(*legend_entries))\n", " plt.savefig(os.path.join(OUT_PATH, f\"plot_e2es_violin_{name}.png\"))\n", "\n", " # Save labels and types\n", " pd.Series(labels).to_csv(os.path.join(OUT_PATH, f\"plot_e2es_violin_labels_{name}.csv\"), index=False, header=[\"label\"])\n", " pd.Series(types).to_csv(os.path.join(OUT_PATH, f\"plot_e2es_violin_types_{name}.csv\"), index=False, header=[\"type\"])\n", "\n", " ##################################################\n", " # 6. Per-callback timestamp/duration records for relevant components (e.g., \"cpu\" stages)\n", " ##################################################\n", " for concat_pc_items in component_latency_items:\n", " # Defensive check\n", " if not isinstance(concat_pc_items[0].location[0], TrPublishInstance):\n", " continue\n", " records = [(item.location[0].timestamp, item.duration) for item in concat_pc_items]\n", " pd.DataFrame(records, columns=(\"timestamp\", \"duration\")).to_csv(\n", " os.path.join(OUT_PATH, f\"dur_ts_{concat_pc_items[0].location[0].publisher.topic_name.replace('/', '__')}_{name}.csv\"),\n", " index=False\n", " )\n", "\n", " ##################################################\n", " # 7. Per-callback timing stats (mean, std, min, max) for \"cpu\" stages\n", " ##################################################\n", " cpu_indices = [i for i, t in enumerate(types) if t == \"cpu\"]\n", " cpu_durations = [np.array([item.location[0].duration for item in component_latency_items[i]]) for i in cpu_indices]\n", " cpu_labels = [labels[i] for i in cpu_indices]\n", " stats = [(lbl, x.mean(), x.std(), x.min(), x.max()) for x, lbl in zip(cpu_durations, cpu_labels)]\n", " pd.DataFrame(stats, columns=[\"callback\", \"mean\", \"std\", \"min\", \"max\"]).to_csv(\n", " os.path.join(OUT_PATH, f\"calc_times_{name}.csv\"), index=False\n", " )\n", "\n", " ##################################################\n", " # 8. Stacked breakdown plot (from message_tree plots)\n", " ##################################################\n", " fig = e2e_breakdown_stack(*e2e_breakdowns, name=f\"{name}\\n{EXPERIMENT_NAME}\")\n", " fig.set_size_inches(16, 9)\n", " fig.set_dpi(300)\n", "\n", "for result_string in result_strings:\n", " print(result_string)\n", "\n", "# append csv results to results file one dir up\n", "with open(os.path.join(OUT_PATH, \"..\", \"..\", \"results.csv\"), \"a\") as f:\n", " f.write(\"\\n\".join(result_strings_csv) + \"\\n\")\n", "\n", "print(\"Done.\")" ] } ], "metadata": { "kernelspec": { "display_name": "venv310", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.12" } }, "nbformat": 4, "nbformat_minor": 2 }