dataflow-analysis/trace-analysis.ipynb
2022-09-19 16:53:26 +02:00

1484 lines
No EOL
55 KiB
Text

{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"import os\n",
"import sys\n",
"import re\n",
"import math\n",
"import graphviz as gv\n",
"from tqdm import tqdm\n",
"from bisect import bisect_right\n",
"from termcolor import colored\n",
"\n",
"import matplotlib.patches as mpatch\n",
"from scipy import stats\n",
"from cycler import cycler\n",
"\n",
"import glob\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
"from matplotlib import pyplot as plt\n",
"\n",
"from misc.utils import cached, parse_as\n",
"\n",
"%matplotlib inline"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"##################################################\n",
"# User Settings\n",
"##################################################\n",
"# Change these to influence the execution of the\n",
"# notebook.\n",
"# You can override these from the command line\n",
"# by defining environment variables with the\n",
"# name of the constants below, prefixed with\n",
"# \"ANA_NB_\".\n",
"# For example, the environment variable\n",
"# \"ANA_NB_TR_PATH\" will override the \"TR_PATH\"\n",
"# setting below.\n",
"##################################################\n",
"\n",
"# The path to the build folder of a ROS2 workspace that contains the\n",
"# tracetools_read and tracetools_analysis folders.\n",
"TRACING_WS_BUILD_PATH = \"~/Projects/autoware/build\"\n",
"\n",
"# Path to trace directory (e.g. ~/.ros/my-trace/ust) or to a converted trace file.\n",
"# Using the path \"/ust\" at the end is optional but greatly reduces processing time\n",
"# if kernel traces are also present.\n",
"TR_PATH = \"/home/max/Projects/optimization_framework/artifacts/iteration0_worker0/aw_replay/tracing/scenario-trace/ust\"\n",
"\n",
"# Path to the folder all artifacts from this notebook are saved to.\n",
"# This entails plots as well as data tables.\n",
"OUT_PATH = \"out/\"\n",
"\n",
"# Whether to cache the results of long computations per set of inputs\n",
"CACHING_ENABLED = False\n",
"\n",
"# Whether to annotate topics/publications with bandwidth/message size\n",
"BW_ENABLED = False\n",
"# Path to a results folder as output by ma-hw-perf-tools/messages/record.bash\n",
"# Used to annotate message sizes in E2E latency calculations\n",
"BW_PATH = \"../ma-hw-perf-tools/data/results\"\n",
"\n",
"# Whether to use dependencies extracted by the Clang-tools to supplement\n",
"# automatic node-internal data flow annotations.\n",
"# If in doubt, set to False.\n",
"CL_ENABLED = False\n",
"# Path to the output directory of the ROS2 dependency checker.\n",
"# Will only be used if CL_ENABLED is True.\n",
"CL_PATH = \"~/Projects/llvm-project/clang-tools-extra/ros2-internal-dependency-checker/output\"\n",
"\n",
"# Whether to compute data flow graphs.\n",
"# If you are only interested in E2E latencies, set this to False\n",
"DFG_ENABLED = False\n",
"# Whether to plot data flow graphs (ignored if DFG_ENABLED is False)\n",
"DFG_PLOT = False\n",
"\n",
"# The maximum node namespace hierarchy level to be plotted.\n",
"# Top-level (1): e.g. /sensing, /control, etc.\n",
"# Level 3: e.g. /sensing/lidar/pointcloud_processor\n",
"DFG_MAX_HIER_LEVEL = 100\n",
"\n",
"# RegEx pattern for nodes that shall be marked as system inputs\n",
"# These will be plotted with a start arrow as known from automata diagrams\n",
"DFG_INPUT_NODE_PATTERNS = [r\"^/sensing\"]\n",
"# RegEx pattern for nodes that shall be marked as system outputs\n",
"# These will be plotted with a double border\n",
"DFG_OUTPUT_NODE_PATTERNS = [r\"^/awapi\", r\"^/control/external_cmd_converter\", \"emergency\"]\n",
"# RegEx for nodes which shall not be plotted in the DFG\n",
"DFG_EXCL_NODE_PATTERNS = [r\"^/rviz2\", r\"transform_listener_impl\"]\n",
"\n",
"# Whether to compute E2E latencies.\n",
"E2E_ENABLED = True\n",
"# Whether to plot end-to-end latency information (ignored if E2E_ENABLED is False)\n",
"E2E_PLOT = False\n",
"# The index of the output message that shall be used in plots that visualize a specific\n",
"# message dependency tree. This index has to be 0 <= n < #output messages\n",
"E2E_PLOT_TIMESTAMP = 200\n",
"# E2E latency threshold. Every E2E latency higher than this is discarded.\n",
"# Set this as low as comfortably possible to speed up calculations.\n",
"# WARNING: If you set this too low (i.e. to E2E latencies that plausibly can happen)\n",
"# your results will be wrong)\n",
"E2E_TIME_LIMIT_S = 2\n",
"\n",
"# All topics containing any of these RegEx patterns are considered output topics in E2E latency calculations\n",
"# E.g. r\"^/control/\" will cover all control topics\n",
"E2E_OUTPUT_TOPIC_PATTERNS = [r\"^/control/.*?/control_cmd\"]\n",
"# All topics containing any of these RegEx patterns are considered input topics in E2E latency calculations\n",
"# E.g. r\"^/sensing/\" will cover all sensing topics\n",
"E2E_INPUT_TOPIC_PATTERNS = [r\"^/vehicle/status/\", r\"^/sensing/\"]\n",
"\n",
"# This code overrides the above constants with environment variables, do not edit.\n",
"for env_key, env_value in os.environ.items():\n",
" if env_key.startswith(\"ANA_NB_\"):\n",
" key = env_key.removeprefix(\"ANA_NB_\")\n",
" if key not in globals().keys():\n",
" continue\n",
" value = parse_as(type(globals()[key]), env_value)\n",
" globals()[key] = value\n",
"\n",
"\n",
"# Convert input paths to absolute paths\n",
"def _expand_path(path):\n",
" return os.path.realpath(os.path.expandvars(os.path.expanduser(path)))\n",
"\n",
"\n",
"TRACING_WS_BUILD_PATH = _expand_path(TRACING_WS_BUILD_PATH)\n",
"TR_PATH = _expand_path(TR_PATH)\n",
"OUT_PATH = _expand_path(OUT_PATH)\n",
"BW_PATH = _expand_path(BW_PATH)\n",
"CL_PATH = _expand_path(CL_PATH)\n",
"\n",
"os.makedirs(OUT_PATH, exist_ok=True)\n",
"\n",
"print(\"User Settings:\")\n",
"for k, v in globals().copy().items():\n",
" if not k.isupper():\n",
" continue\n",
" print(f\" {k:.<40s} := {v}\")"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"sys.path.append(os.path.join(TRACING_WS_BUILD_PATH, \"tracetools_read/\"))\n",
"sys.path.append(os.path.join(TRACING_WS_BUILD_PATH, \"tracetools_analysis/\"))\n",
"from tracetools_read.trace import *\n",
"from tracetools_analysis.loading import load_file\n",
"from tracetools_analysis.processor.ros2 import Ros2Handler\n",
"\n",
"from tracing_interop.tr_types import *\n",
"from latency_graph.message_tree import DepTree\n",
"from matching.subscriptions import sanitize"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "markdown",
"source": [
"# Organize Trace Data"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%% md\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"def _load_traces():\n",
" file = load_file(TR_PATH)\n",
" handler = Ros2Handler.process(file)\n",
" return TrContext(handler)\n",
"\n",
"\n",
"_tracing_context = cached(\"tr_objects\", _load_traces, [TR_PATH], not CACHING_ENABLED)\n",
"_tr_globals = [\"nodes\", \"publishers\", \"subscriptions\", \"timers\", \"timer_node_links\", \"subscription_objects\",\n",
" \"callback_objects\", \"callback_symbols\", \"publish_instances\", \"callback_instances\", \"topics\"]\n",
"\n",
"# Help the IDE recognize those identifiers\n",
"nodes = publishers = subscriptions = timers = timer_node_links = subscription_objects = callback_objects = callback_symbols = publish_instances = callback_instances = topics = None\n",
"\n",
"for name in _tr_globals:\n",
" globals()[name] = getattr(_tracing_context, name)\n",
"\n",
"print(\"Done.\")"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"for topic in sorted(topics, key=lambda t: t.name):\n",
" topic: TrTopic\n",
" print(f\"{topic.name:.<120s} | {sum(map(lambda p: len(p.instances), topic.publishers))}\")"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "markdown",
"source": [
"# E2E Latency Calculation"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%% md\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"from latency_graph import latency_graph as lg\n",
"\n",
"\n",
"def _make_latency_graph():\n",
" return lg.LatencyGraph(_tracing_context)\n",
"\n",
"\n",
"lat_graph = cached(\"lat_graph\", _make_latency_graph, [TR_PATH], not CACHING_ENABLED)"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"%%skip_if_false DFG_ENABLED\n",
"%%skip_if_false DFG_PLOT\n",
"\n",
"#################################################\n",
"# Plot DFG\n",
"#################################################\n",
"\n",
"# Compare with: https://autowarefoundation.github.io/autoware-documentation/main/design/autoware-architecture/node-diagram/\n",
"node_colors = {\n",
" \"sensing\": {\"fill\": \"#e1d5e7\", \"stroke\": \"#9673a6\"},\n",
" \"localization\": {\"fill\": \"#dae8fc\", \"stroke\": \"#6c8ebf\"},\n",
" \"perception\": {\"fill\": \"#d5e8d4\", \"stroke\": \"#82b366\"},\n",
" \"planning\": {\"fill\": \"#fff2cc\", \"stroke\": \"#d6b656\"},\n",
" \"control\": {\"fill\": \"#ffe6cc\", \"stroke\": \"#d79b00\"},\n",
" \"system\": {\"fill\": \"#f8cecc\", \"stroke\": \"#b85450\"},\n",
" \"vehicle_interface\": {\"fill\": \"#b0e3e6\", \"stroke\": \"#0e8088\"},\n",
" None: {\"fill\": \"#f5f5f5\", \"stroke\": \"#666666\"}\n",
"}\n",
"\n",
"node_namespace_mapping = {\n",
" 'perception': 'perception',\n",
" 'sensing': 'sensing',\n",
" 'plannitokenizeng': 'planning',\n",
" 'control': 'control',\n",
" 'awapi': None,\n",
" 'autoware_api': None,\n",
" 'map': None,\n",
" 'system': 'system',\n",
" 'localization': 'localization',\n",
" 'robot_state_publisher': None,\n",
" 'aggregator_node': None,\n",
" 'pointcloud_container': 'sensing',\n",
"}\n",
"\n",
"\n",
"g = gv.Digraph('G', filename=\"latency_graph.gv\",\n",
" node_attr={'shape': 'plain'},\n",
" graph_attr={'pack': '1'})\n",
"g.graph_attr['rankdir'] = 'LR'\n",
"\n",
"\n",
"def plot_hierarchy(gv_parent, lg_node: lg.LGHierarchyLevel, **subgraph_kwargs):\n",
" if lg_node.name == \"[NONE]\":\n",
" return\n",
"\n",
" print(f\"{' ' * lg_node.full_name.count('/')}Processing {lg_node.name}: {len(lg_node.callbacks)}\")\n",
" with gv_parent.subgraph(name=f\"cluster_{lg_node.full_name.replace('/', '__')}\", **subgraph_kwargs) as c:\n",
" c.attr(label=lg_node.name)\n",
" for cb in lg_node.callbacks:\n",
" if isinstance(cb, lg.LGTrCallback):\n",
" tr_cb = cb.cb\n",
" try:\n",
" sym = _tracing_context.callback_symbols.by_id.get(tr_cb.callback_object)\n",
" pretty_sym = repr(sanitize(sym.symbol))\n",
" except KeyError:\n",
" pretty_sym = cb.name\n",
" except TypeError:\n",
" pretty_sym = cb.name\n",
" else:\n",
" pretty_sym = cb.name\n",
"\n",
" pretty_sym = pretty_sym.replace(\"&\", \"&amp;\").replace(\"<\", \"&lt;\").replace(\">\", \"&gt;\")\n",
"\n",
" c.node(cb.id(),\n",
" f'<<table BORDER=\"0\" CELLBORDER=\"1\" CELLSPACING=\"0\"><tr><td port=\"in\"></td><td>{pretty_sym}</td><td port=\"out\"></td></tr></table>>')\n",
"\n",
" for ch in lg_node.children:\n",
" plot_hierarchy(c, ch, **subgraph_kwargs)\n",
"\n",
"\n",
"def plot_lg(graph: lg.LatencyGraph):\n",
" for top_level_node in graph.top_node.children:\n",
" colors = node_colors[node_namespace_mapping.get(top_level_node.name)]\n",
" plot_hierarchy(g, top_level_node, graph_attr={'bgcolor': colors[\"fill\"], 'pencolor': colors[\"stroke\"]})\n",
"\n",
" for edge in graph.edges:\n",
" g.edge(f\"{edge.start.id()}:out\", f\"{edge.end.id()}:in\")\n",
"\n",
"\n",
"plot_lg(lat_graph)\n",
"\n",
"g.save(\"latency_graph.gv\")\n",
"g.render(\"latency_graph.svg\")\n",
"\n",
"g"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"%%skip_if_false DFG_ENABLED\n",
"%%skip_if_false DFG_PLOT\n",
"\n",
"\n",
"##################################################\n",
"# Compute in/out topics for hierarchy level X\n",
"##################################################\n",
"\n",
"def get_nodes_on_level(lat_graph: lg.LatencyGraph):\n",
" def _traverse_node(node: lg.LGHierarchyLevel, cur_lvl=0):\n",
" if cur_lvl == DFG_MAX_HIER_LEVEL:\n",
" return [node]\n",
"\n",
" if not node.children and cur_lvl < DFG_MAX_HIER_LEVEL:\n",
" return [node]\n",
"\n",
" collected_nodes = []\n",
" for ch in node.children:\n",
" collected_nodes += _traverse_node(ch, cur_lvl + 1)\n",
" return collected_nodes\n",
"\n",
" return _traverse_node(lat_graph.top_node)\n",
"\n",
"\n",
"lvl_nodes = get_nodes_on_level(lat_graph)\n",
"lvl_nodes = [n for n in lvl_nodes if not any(re.search(p, n.full_name) for p in DFG_EXCL_NODE_PATTERNS)]\n",
"\n",
"input_nodes = [n.full_name for n in lvl_nodes if any(re.search(p, n.full_name) for p in DFG_INPUT_NODE_PATTERNS)]\n",
"output_nodes = [n.full_name for n in lvl_nodes if any(re.search(p, n.full_name) for p in DFG_OUTPUT_NODE_PATTERNS)]\n",
"\n",
"print(', '.join(map(lambda n: n, input_nodes)))\n",
"print(', '.join(map(lambda n: n, output_nodes)))\n",
"print(', '.join(map(lambda n: n.full_name, lvl_nodes)))\n",
"\n",
"\n",
"def _collect_callbacks(n: lg.LGHierarchyLevel):\n",
" callbacks = []\n",
" callbacks += n.callbacks\n",
" for ch in n.children:\n",
" callbacks += _collect_callbacks(ch)\n",
" return callbacks\n",
"\n",
"\n",
"cb_to_node_map = {}\n",
"for n in lvl_nodes:\n",
" cbs = _collect_callbacks(n)\n",
" for cb in cbs:\n",
" cb_to_node_map[cb.id()] = n\n",
"\n",
"edges_between_nodes = {}\n",
"for edge in lat_graph.edges:\n",
" from_node = cb_to_node_map.get(edge.start.id())\n",
" to_node = cb_to_node_map.get(edge.end.id())\n",
"\n",
" if from_node is None or to_node is None:\n",
" continue\n",
"\n",
" if from_node.full_name == to_node.full_name:\n",
" continue\n",
"\n",
" k = (from_node.full_name, to_node.full_name)\n",
"\n",
" if k not in edges_between_nodes:\n",
" edges_between_nodes[k] = []\n",
"\n",
" edges_between_nodes[k].append(edge)\n",
"\n",
"g = gv.Digraph('G', filename=\"latency_graph.gv\",\n",
" node_attr={'shape': 'plain'},\n",
" graph_attr={'pack': '1'})\n",
"g.graph_attr['rankdir'] = 'LR'\n",
"\n",
"for n in lvl_nodes:\n",
" colors = node_colors[node_namespace_mapping.get(n.full_name.strip(\"/\").split(\"/\")[0])]\n",
" peripheries = \"1\" if n.full_name not in output_nodes else \"2\"\n",
" g.node(n.full_name, label=n.full_name, fillcolor=colors[\"fill\"], color=colors[\"stroke\"],\n",
" shape=\"box\", style=\"filled\", peripheries=peripheries)\n",
"\n",
" if n.full_name in input_nodes:\n",
" helper_node_name = f\"{n.full_name}__before\"\n",
" g.node(helper_node_name, label=\"\", shape=\"none\", height=\"0\", width=\"0\")\n",
" g.edge(helper_node_name, n.full_name)\n",
"\n",
"\n",
"def compute_e2e_paths(start_nodes, end_nodes, edges):\n",
" frontier_paths = [[n] for n in start_nodes]\n",
" final_paths = []\n",
"\n",
" while frontier_paths:\n",
" frontier_paths_new = []\n",
"\n",
" for path in frontier_paths:\n",
" head = path[-1]\n",
" if head in end_nodes:\n",
" final_paths.append(path)\n",
" continue\n",
"\n",
" out_nodes = [n_to for n_from, n_to in edges if n_from == head if n_to not in path]\n",
" new_paths = [path + [n] for n in out_nodes]\n",
" frontier_paths_new += new_paths\n",
"\n",
" frontier_paths = frontier_paths_new\n",
"\n",
" final_paths = [[(n_from, n_to)\n",
" for n_from, n_to in zip(path[:-1], path[1:])]\n",
" for path in final_paths]\n",
" return final_paths\n",
"\n",
"\n",
"e2e_paths = compute_e2e_paths(input_nodes, output_nodes, edges_between_nodes)\n",
"\n",
"for (src_name, dst_name), edges in edges_between_nodes.items():\n",
" print(src_name, dst_name, len(edges))\n",
" color = \"black\" if any((src_name, dst_name) in path for path in e2e_paths) else \"tomato\"\n",
" g.edge(src_name, dst_name, penwidth=str(math.log(len(edges)) * 2 + .2), color=color)\n",
"\n",
"g.save(\"level_graph.gv\")\n",
"g.render(\"level_graph.svg\")\n",
"\n",
"g"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"%%skip_if_false E2E_ENABLED\n",
"\n",
"\n",
"def inst_get_dep_msg(inst: TrCallbackInstance):\n",
" if inst.callback_object not in _tracing_context.callback_objects.by_callback_object:\n",
" # print(\"Callback not found (2)\")\n",
" return None\n",
"\n",
" if not isinstance(inst.callback_obj.owner, TrSubscriptionObject):\n",
" # print(f\"Wrong type: {type(inst.callback_obj.owner)}\")\n",
" return None\n",
"\n",
" sub_obj: TrSubscriptionObject = inst.callback_obj.owner\n",
" if sub_obj and sub_obj.subscription and sub_obj.subscription.topic:\n",
" # print(f\"Subscription has no topic\")\n",
" pubs = sub_obj.subscription.topic.publishers\n",
" else:\n",
" pubs = []\n",
"\n",
" def _pub_latest_msg_before(pub: TrPublisher, inst):\n",
" i_latest_msg = bisect_right(pub.instances, inst.timestamp, key=lambda x: x.timestamp) - 1\n",
" if i_latest_msg < 0 or i_latest_msg >= len(pub.instances):\n",
" return None\n",
" latest_msg = pub.instances[i_latest_msg]\n",
" if latest_msg.timestamp >= inst.timestamp:\n",
" return None\n",
"\n",
" return latest_msg\n",
"\n",
" msgs = [_pub_latest_msg_before(pub, inst) for pub in pubs]\n",
" msgs = [msg for msg in msgs if msg is not None]\n",
" msgs.sort(key=lambda i: i.timestamp, reverse=True)\n",
" if msgs:\n",
" msg = msgs[0]\n",
" return msg\n",
"\n",
" # print(f\"No messages found for topic {sub_obj.subscription.topic}\")\n",
" return None\n",
"\n",
"\n",
"def inst_get_dep_insts(inst: TrCallbackInstance):\n",
" if inst.callback_object not in _tracing_context.callback_objects.by_callback_object:\n",
" # print(\"Callback not found\")\n",
" return []\n",
" dep_cbs = get_cb_dep_cbs(inst.callback_obj)\n",
"\n",
" def _cb_to_chronological_inst(cb: TrCallbackObject, inst):\n",
" i_inst_latest = bisect_right(cb.callback_instances, inst.timestamp, key=lambda x: x.timestamp)\n",
"\n",
" for inst_before in cb.callback_instances[i_inst_latest::-1]:\n",
" if lg.inst_runtime_interval(inst_before)[-1] < inst.timestamp:\n",
" return inst_before\n",
"\n",
" return None\n",
"\n",
" insts = [_cb_to_chronological_inst(cb, inst) for cb in dep_cbs]\n",
" insts = [inst for inst in insts if inst is not None]\n",
" return insts\n",
"\n",
"\n",
"def get_cb_dep_cbs(cb: TrCallbackObject):\n",
" match cb.owner:\n",
" case TrSubscriptionObject() as sub_obj:\n",
" sub_obj: TrSubscriptionObject\n",
" owner = sub_obj.subscription.node\n",
" case TrTimer() as tmr:\n",
" tmr: TrTimer\n",
" owner = tmr.node\n",
" case _:\n",
" raise RuntimeError(f\"Encountered {cb.owner} as callback owner\")\n",
"\n",
" owner: TrNode\n",
" dep_sub_objs = {sub.subscription_object for sub in owner.subscriptions}\n",
" dep_cbs = {callback_objects.by_id.get(sub_obj.id) for sub_obj in dep_sub_objs if sub_obj is not None}\n",
" dep_cbs |= {callback_objects.by_id.get(tmr.id) for tmr in owner.timers}\n",
" dep_cbs.discard(cb)\n",
" dep_cbs.discard(None)\n",
"\n",
" return dep_cbs\n",
"\n",
"\n",
"def get_msg_dep_cb(msg: TrPublishInstance):\n",
" \"\"\"\n",
" For a given message instance `msg`, find the publishing callback,\n",
" as well as the message instances that callback depends on (transitively within its TrNode).\n",
" \"\"\"\n",
"\n",
" # Find CB instance that published msg\n",
" # print(f\"PUB {msg.publisher.node.path if msg.publisher.node is not None else '??'} ==> {msg.publisher.topic_name}\")\n",
" pub_cbs = lat_graph.pub_cbs.get(msg.publisher)\n",
" if pub_cbs is None:\n",
" # print(\"Publisher unknown to lat graph. Skipping.\")\n",
" return None\n",
"\n",
" # print(f\"Found {len(pub_cbs)} pub cbs\")\n",
" cb_inst_candidates = []\n",
" for cb in pub_cbs:\n",
" # print(f\" > CB ({len(cb.callback_instances)} instances): {cb.callback_symbol.symbol if cb.callback_symbol else cb.id}\")\n",
" i_inst_after = bisect_right(cb.callback_instances, msg.timestamp, key=lambda x: x.timestamp)\n",
"\n",
" for inst in cb.callback_instances[:i_inst_after]:\n",
" inst_start, inst_end = lg.inst_runtime_interval(inst)\n",
" if msg.timestamp > inst_end:\n",
" continue\n",
"\n",
" assert inst_start <= msg.timestamp <= inst_end\n",
"\n",
" cb_inst_candidates.append(inst)\n",
"\n",
" if len(cb_inst_candidates) > 1:\n",
" # print(\"Found multiple possible callbacks\")\n",
" return None\n",
" if not cb_inst_candidates:\n",
" # print(\"Found no possible callbacks\")\n",
" return None\n",
"\n",
" dep_inst = cb_inst_candidates[0]\n",
" return dep_inst\n",
"\n",
"\n",
"def get_dep_tree(inst: TrPublishInstance | TrCallbackInstance, lvl=0, visited_topics=None, is_dep_cb=False,\n",
" start_time=None):\n",
" if visited_topics is None:\n",
" visited_topics = set()\n",
"\n",
" if start_time is None:\n",
" start_time = inst.timestamp\n",
"\n",
" if inst.timestamp - start_time > E2E_TIME_LIMIT_S:\n",
" return None\n",
"\n",
" children_are_dep_cbs = False\n",
"\n",
" match inst:\n",
" case TrPublishInstance(publisher=pub):\n",
" if pub.topic_name in visited_topics:\n",
" return None\n",
"\n",
" visited_topics.add(pub.topic_name)\n",
" deps = [get_msg_dep_cb(inst)]\n",
" case TrCallbackInstance() as cb_inst:\n",
" deps = [inst_get_dep_msg(cb_inst)]\n",
" if not is_dep_cb:\n",
" deps += inst_get_dep_insts(cb_inst)\n",
" children_are_dep_cbs = True\n",
" case _:\n",
" print(\n",
" f\"[WARN] Expected inst to be of type TrPublishInstance or TrCallbackInstance, got {type(inst).__name__}\")\n",
" return None\n",
" #print(\"Rec level\", lvl)\n",
" deps = [dep for dep in deps if dep is not None]\n",
" deps = [get_dep_tree(dep, lvl + 1, set(visited_topics), children_are_dep_cbs, start_time) for dep in deps]\n",
" deps = [dep for dep in deps if dep is not None]\n",
" return DepTree(inst, deps)"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"%%skip_if_false E2E_ENABLED\n",
"\n",
"end_topics = [t for t in _tracing_context.topics if any(re.search(f, t.name) for f in E2E_OUTPUT_TOPIC_PATTERNS)]\n",
"\n",
"\n",
"def build_dep_trees():\n",
" all_trees = []\n",
" for end_topic in end_topics:\n",
" end_topic: TrTopic\n",
" print(f\"====={end_topic.name}\")\n",
"\n",
" pubs = end_topic.publishers\n",
" print(len(pubs))\n",
" for pub in pubs:\n",
" msgs = list(pub.instances)\n",
" print(\"\\n\".join(map(str, msgs)))\n",
" for msg in msgs:\n",
" msg: TrPublishInstance\n",
" tree = get_dep_tree(msg)\n",
" all_trees.append(tree)\n",
" print()\n",
" return all_trees\n",
"\n",
"\n",
"try:\n",
" trees = cached(\"trees\", build_dep_trees, [TR_PATH], not CACHING_ENABLED)\n",
"except Exception as e:\n",
" import traceback\n",
" print(e)\n",
" traceback.print_exc()"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"%%skip_if_false E2E_ENABLED\n",
"%%skip_if_false BW_ENABLED\n",
"\n",
"\n",
"\n",
"def parse_bytes(string):\n",
" match string[-1]:\n",
" case 'K':\n",
" exponent = 1e3\n",
" case 'M':\n",
" exponent = 1e6\n",
" case _:\n",
" exponent = 1\n",
"\n",
" num = float(string.split(\" \")[0])\n",
" return num * exponent\n",
"\n",
"\n",
"def bytes_str(bytes):\n",
" if bytes >= 1024 ** 2:\n",
" return f\"{bytes / (1024 ** 2):.2f} MiB\"\n",
" if bytes >= 1024:\n",
" return f\"{bytes / 1024:.2f} KiB\"\n",
" return f\"{bytes:.0f} B\"\n",
"\n",
"\n",
"bw_files = glob.glob(os.path.join(BW_PATH, \"*.log\"))\n",
"msg_sizes = {}\n",
"for bw_file in bw_files:\n",
" with open(bw_file) as f:\n",
" lines = f.readlines()\n",
" topic = os.path.splitext(os.path.split(bw_file)[1])[0].replace(\"__\", \"/\")\n",
"\n",
" if not lines or re.match(f\"^\\s*$\", lines[-1]):\n",
" #print(f\"No data for {topic}\")\n",
" continue\n",
"\n",
" line_pattern = re.compile(\n",
" r\"(?P<bw>[0-9.]+ [KM]?)B/s from (?P<n_msgs>[0-9.]+) messages --- Message size mean: (?P<mean>[0-9.]+ [KM]?)B min: (?P<min>[0-9.]+ [KM]?)B max: (?P<max>[0-9.]+ [KM]?)B\\n\")\n",
" m = re.fullmatch(line_pattern, lines[-1])\n",
" if m is None:\n",
" print(f\"Line could not be parsed in {topic}: '{lines[-1]}'\")\n",
" continue\n",
"\n",
" msg_sizes[topic] = {'bw': parse_bytes(m.group(\"bw\")),\n",
" 'min': parse_bytes(m.group(\"min\")),\n",
" 'mean': parse_bytes(m.group(\"mean\")),\n",
" 'max': parse_bytes(m.group(\"max\"))}\n"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"%%skip_if_false E2E_ENABLED\n",
"\n",
"def leaf_topics(tree: DepTree, lvl=0):\n",
" ret_list = []\n",
" match tree.head:\n",
" case TrPublishInstance(publisher=pub):\n",
" if pub:\n",
" ret_list += [(lvl, pub.topic_name)]\n",
" ret_list += [(lvl, None)]\n",
"\n",
" for dep in tree.deps:\n",
" ret_list += leaf_topics(dep, lvl + 1)\n",
" return ret_list\n",
"\n",
"\n",
"def all_e2es(tree: DepTree, t_start=None):\n",
" if t_start is None:\n",
" t_start = tree.head.timestamp\n",
"\n",
" if not tree.deps:\n",
" return [t_start - tree.head.timestamp]\n",
"\n",
" ret_list = []\n",
" for dep in tree.deps:\n",
" ret_list += all_e2es(dep, t_start)\n",
" return ret_list\n",
"\n",
"\n",
"def relevant_e2es(tree: DepTree, input_topic_patterns, t_start=None, path=None):\n",
" if t_start is None:\n",
" t_start = tree.head.timestamp\n",
"\n",
" if path is None:\n",
" path = []\n",
"\n",
" latency = t_start - tree.head.timestamp\n",
" if latency > E2E_TIME_LIMIT_S:\n",
" return []\n",
"\n",
" new_path = [tree.head] + path\n",
"\n",
" if not tree.deps:\n",
" match tree.head:\n",
" case TrPublishInstance(publisher=pub):\n",
" if pub and any(re.search(f, pub.topic_name) for f in input_topic_patterns):\n",
" return [(latency, new_path)]\n",
"\n",
" ret_list = []\n",
" for dep in tree.deps:\n",
" ret_list += relevant_e2es(dep, input_topic_patterns, t_start, new_path)\n",
" return ret_list\n",
"\n",
"\n",
"e2ess = []\n",
"e2e_pathss = []\n",
"for tree in trees:\n",
" e2es, e2e_paths = zip(*relevant_e2es(tree, E2E_INPUT_TOPIC_PATTERNS))\n",
" e2ess.append(e2es)\n",
" e2e_pathss.append(e2e_paths)"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"#from matplotlib.animation import FuncAnimation\n",
"#from IPython import display\n",
"\n",
"#fig, ax = plt.subplots(figsize=(16, 9))\n",
"#ax: plt.Axes\n",
"#ax.set_xlim(0, 4)\n",
"\n",
"#ax.hist([], bins=200, range=(0, 4), histtype='stepfilled')\n",
"#ax.set_title(\"Time: 0.000000s\")\n",
"\n",
"#def anim(frame):\n",
"# print(frame, end='\\r')\n",
"# ax.clear()\n",
"# ax.hist(e2es[frame], bins=200, range=(0, 4), histtype='stepfilled')\n",
"# ax.set_title(f\"Time: {(trees[frame].head.timestamp - trees[0].head.timestamp):.6}s\")\n",
"\n",
"\n",
"#anim_created = FuncAnimation(fig, anim, min(len(trees), 10000), interval=16, repeat_delay=200)\n",
"\n",
"#video = anim_created.save(\"anim.mp4\", dpi=120)\n",
"\n",
"#for tree in trees:\n",
"# path = tree.critical_path(start_topic_filters)\n",
"# for i, inst in enumerate(path[::-1]):\n",
"# match inst:\n",
"# case TrPublishInstance(publisher=pub):\n",
"# print(f\" {i:>3d}: T\", pub.topic_name)\n",
"# case TrCallbackInstance(callback_obj=cb):\n",
"# match cb.owner:\n",
"# case TrSubscriptionObject(subscription=sub):\n",
"# node = sub.node\n",
"# case TrTimer() as tmr:\n",
"# node = tmr.node\n",
"# case _:\n",
"# raise ValueError(f\"Callback owner type not recognized: {type(cb.owner).__name__}\")\n",
"#\n",
"# print(f\" {i:>3d}: N\", node.path)\n",
"# print(\"==================\")\n"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"%%skip_if_false E2E_ENABLED\n",
"%%skip_if_false E2E_PLOT\n",
"\n",
"\n",
"fig, ax = plt.subplots(figsize=(18, 9), num=\"e2e_plot\")\n",
"DS = 1\n",
"times = [tree.head.timestamp - trees[0].head.timestamp for tree in trees[::DS]]\n",
"\n",
"ax2 = ax.twinx()\n",
"ax2.plot(times, list(map(lambda paths: sum(map(len, paths)) / len(paths), e2e_pathss[::DS])), color=\"orange\")\n",
"ax2.fill_between(times,\n",
" list(map(lambda paths: min(map(len, paths)), e2e_pathss[::DS])),\n",
" list(map(lambda paths: max(map(len, paths)), e2e_pathss[::DS])),\n",
" alpha=.3, color=\"orange\")\n",
"\n",
"ax.plot(times, [np.mean(e2es) for e2es in e2ess[::DS]])\n",
"ax.fill_between(times, [np.min(e2es) for e2es in e2ess[::DS]], [np.max(e2es) for e2es in e2ess[::DS]], alpha=.3)\n",
"\n",
"\n",
"def scatter_topic(topic_name, y=0, **scatter_kwargs):\n",
" for pub in topics.by_name[topic_name].publishers:\n",
" if not pub:\n",
" continue\n",
"\n",
" inst_timestamps = [inst.timestamp - trees[0].head.timestamp for inst in pub.instances if\n",
" inst.timestamp >= trees[0].head.timestamp]\n",
" scatter_kwargs_default = {\"marker\": \"x\", \"color\": \"indianred\"}\n",
" scatter_kwargs_default.update(scatter_kwargs)\n",
" ax.scatter(inst_timestamps, np.full(len(inst_timestamps), fill_value=y), **scatter_kwargs_default)\n",
"\n",
"\n",
"scatter_topic(\"/autoware/engage\")\n",
"scatter_topic(\"/planning/scenario_planning/parking/trajectory\", y=-.04, color=\"cadetblue\")\n",
"scatter_topic(\"/planning/scenario_planning/lane_driving/trajectory\", y=-.08, color=\"darkgreen\")\n",
"\n",
"ax.set_xlabel(\"Simulation time [s]\")\n",
"ax.set_ylabel(\"End-to-End latency [s]\")\n",
"ax2.set_ylabel(\"End-to-End path length\")\n",
"None"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"%%skip_if_false E2E_ENABLED\n",
"\n",
"def critical_path(self):\n",
" if not self.deps:\n",
" return [self.head]\n",
" return [self.head, *min(map(critical_path, self.deps), key=lambda ls: ls[-1].timestamp)]\n",
"\n",
"\n",
"def e2e_lat(self):\n",
" return self.head.timestamp - critical_path(self)[-1].timestamp\n",
"\n",
"\n",
"def get_relevant_tree(tree: DepTree, accept_leaf=lambda x: True, root=None):\n",
" if root is None:\n",
" root = tree.head\n",
" if not tree.deps:\n",
" if accept_leaf(tree.head, root):\n",
" return tree\n",
" return None\n",
"\n",
" relevant_deps = [get_relevant_tree(dep, accept_leaf, root) for dep in tree.deps]\n",
" if not any(relevant_deps):\n",
" return None\n",
"\n",
" return DepTree(tree.head, [dep for dep in relevant_deps if dep])\n",
"\n",
"\n",
"def fanout(self):\n",
" if not self.deps:\n",
" return 1\n",
"\n",
" return sum(map(fanout, self.deps))\n",
"\n",
"\n",
"def sort_subtree(subtree: DepTree, sort_func=lambda t: t.head.timestamp - e2e_lat(t)):\n",
" subtree.deps.sort(key=sort_func)\n",
" for dep in subtree.deps:\n",
" sort_subtree(dep, sort_func)\n",
" return subtree\n",
"\n",
"\n",
"def _leaf_filter(inst, root):\n",
" if root.timestamp - inst.timestamp > E2E_TIME_LIMIT_S:\n",
" return False\n",
"\n",
" match inst:\n",
" case TrPublishInstance(publisher=pub):\n",
" return pub and any(re.search(f, pub.topic_name) for f in E2E_INPUT_TOPIC_PATTERNS)\n",
" return False\n",
"\n",
"\n",
"relevant_trees = [get_relevant_tree(tree, _leaf_filter) for tree in trees]\n",
"relevant_trees = [t for t in relevant_trees if t]"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"%%skip_if_false E2E_ENABLED\n",
"%%skip_if_false E2E_PLOT\n",
"\n",
"def dict_safe_append(dictionary, key, value):\n",
" if key not in dictionary:\n",
" dictionary[key] = []\n",
" dictionary[key].append(value)\n",
"\n",
"\n",
"fig, (ax, ax_rel) = plt.subplots(2, 1, sharex=True, figsize=(60, 30), num=\"crit_plot\")\n",
"ax.set_prop_cycle(cycler('color', [plt.cm.nipy_spectral(i / 4) for i in range(5)]))\n",
"ax_rel.set_prop_cycle(cycler('color', [plt.cm.nipy_spectral(i / 4) for i in range(5)]))\n",
"\n",
"critical_paths = [critical_path(tree) for tree in relevant_trees[::DS]]\n",
"\n",
"time_breakdown = {}\n",
"\n",
"for path in critical_paths:\n",
" tmr_cb_calc_time = 0.0\n",
" sub_cb_calc_time = 0.0\n",
" tmr_cb_relevant_time = 0.0\n",
" sub_cb_relevant_time = 0.0\n",
" dds_time = 0.0\n",
" idle_time = 0.0\n",
"\n",
" last_pub_time = None\n",
" last_cb_time = None\n",
" for inst in path:\n",
" match inst:\n",
" case TrPublishInstance(timestamp=t):\n",
" assert last_pub_time is None, \"Two publication without callback inbetween\"\n",
"\n",
" if last_cb_time is not None:\n",
" dds_time += last_cb_time - t\n",
"\n",
" last_pub_time = t\n",
" last_cb_time = None\n",
" case TrCallbackInstance(callback_obj=cb, timestamp=t, duration=d):\n",
" if last_pub_time is not None:\n",
" assert last_pub_time <= t + d, \"Publication out of CB instance timeframe\"\n",
"\n",
" match cb.owner:\n",
" case TrTimer():\n",
" tmr_cb_calc_time += d\n",
" tmr_cb_relevant_time += last_pub_time - t\n",
" case TrSubscriptionObject():\n",
" sub_cb_calc_time += d\n",
" sub_cb_relevant_time += last_pub_time - t\n",
" elif last_cb_time is not None:\n",
" idle_time += last_cb_time - (t + d)\n",
" last_pub_time = None\n",
" last_cb_time = t\n",
"\n",
" dict_safe_append(time_breakdown, \"Timer CB\", tmr_cb_relevant_time)\n",
" dict_safe_append(time_breakdown, \"Subscription CB\", sub_cb_relevant_time)\n",
" dict_safe_append(time_breakdown, \"DDS\", dds_time)\n",
" dict_safe_append(time_breakdown, \"Idle\", idle_time)\n",
"\n",
"time_breakdown = {k: np.array(v) for k, v in time_breakdown.items()}\n",
"\n",
"timer_cb_times = [sum(inst.duration for inst in path if\n",
" isinstance(inst, TrCallbackInstance) and isinstance(inst.callback_obj, TrTimer)) for path in\n",
" critical_paths]\n",
"sub_cb_times = [sum(inst.duration for inst in path if isinstance(inst, TrCallbackInstance)) for path in critical_paths]\n",
"\n",
"labels, values = list(zip(*time_breakdown.items()))\n",
"\n",
"#ax.plot(range(len(relevant_trees[::DS])), [e2e_lat(tree) for tree in relevant_trees[::DS]], label=\"Total E2E\")\n",
"ax.stackplot(range(len(relevant_trees[::DS])), values, labels=labels)\n",
"ax.legend()\n",
"ax.set_title(\"End-to-End Latency Breakdown\")\n",
"ax.set_ylabel(\"End-to-End Latency [s]\")\n",
"\n",
"timestep_mags = np.array([sum(vs) for vs in zip(*values)])\n",
"ax_rel.stackplot(range(len(relevant_trees[::DS])), [val / timestep_mags for val in values], labels=labels)\n",
"ax_rel.set_title(\"End-to-End Latency Breakdown (relative)\")\n",
"ax_rel.set_ylabel(\"End-to-End Latency Fraction\")\n",
"ax_rel.set_xlabel(\"Timestep\")\n",
"ax_rel.legend()\n",
"\n",
"None"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"\n",
"%%skip_if_false E2E_ENABLED\n",
"%%skip_if_false E2E_PLOT\n",
"\n",
"fig, ax = plt.subplots(figsize=(60, 15), num=\"crit_pdf\")\n",
"ax.set_prop_cycle(cycler('color', [plt.cm.nipy_spectral(i / 4) for i in range(5)]))\n",
"\n",
"kde = stats.gaussian_kde(timestep_mags)\n",
"xs = np.linspace(timestep_mags.min(), timestep_mags.max(), 1000)\n",
"ax.plot(xs, kde(xs), label=\"End-to-End Latency\")\n",
"perc = 90\n",
"ax.axvline(np.percentile(timestep_mags, perc), label=f\"{perc}th percentile\")\n",
"\n",
"ax2 = ax.twinx()\n",
"ax2.hist(timestep_mags, 200)\n",
"ax2.set_ylim(0, ax2.get_ylim()[1])\n",
"\n",
"ax.set_title(\"Time Distribution for E2E Breakdown\")\n",
"ax.set_xlabel(\"Time [s]\")\n",
"ax.set_ylabel(\"Frequency\")\n",
"ax.set_xlim(0, 2.01)\n",
"ax.set_ylim(0, ax.get_ylim()[1])\n",
"ax.legend()\n",
"\n",
"None"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"%%skip_if_false E2E_ENABLED\n",
"%%skip_if_false E2E_PLOT\n",
"\n",
"tree = relevant_trees[E2E_PLOT_TIMESTAMP]\n",
"e2es = e2ess[E2E_PLOT_TIMESTAMP]\n",
"e2e_paths = e2e_pathss[E2E_PLOT_TIMESTAMP]\n",
"margin_y = .2\n",
"margin_x = 0\n",
"arr_width = 1 - margin_y\n",
"\n",
"\n",
"def cb_str(inst: TrCallbackInstance):\n",
" cb: TrCallbackObject = inst.callback_obj\n",
" if not cb:\n",
" return None\n",
" ret_str = f\"- {inst.duration * 1e3:07.3f}ms \"\n",
" #if cb.callback_symbol:\n",
" # ret_str = repr(sanitize(cb.callback_symbol.symbol))\n",
"\n",
" match cb.owner:\n",
" case TrSubscriptionObject(subscription=sub):\n",
" sub: TrSubscription\n",
" ret_str = f\"{ret_str}{sub.node.path if sub.node else None} <- {sub.topic_name}\"\n",
" case TrTimer(period=p, node=node):\n",
" p: int\n",
" node: TrNode\n",
" ret_str = f\"{ret_str}{node.path if node else None} <- @{1 / (p * 1e-9):.2f}Hz\"\n",
" return ret_str\n",
"\n",
"\n",
"def ts_str(inst, prev):\n",
" return f\"{(inst.timestamp - prev) * 1e3:+09.3f}ms\"\n",
"\n",
"\n",
"def bw_str(bw):\n",
" return bytes_str(bw['mean'])\n",
"\n",
"\n",
"def trunc_chars(string, w, e2e):\n",
" if w < e2e * .005:\n",
" return \"\"\n",
" n_chars = max(math.floor(w / (e2e * .17) * 65) - 5, 0)\n",
" if n_chars < 4:\n",
" return \"\"\n",
"\n",
" return \"...\" + string[-n_chars:] if n_chars < len(string) else string\n",
"\n",
"\n",
"#e2e_paths = sorted(e2e_paths, key=lambda path: path[-1].timestamp - path[0].timestamp, reverse=True)\n",
"#for y, e2e_path in reversed(list(enumerate(e2e_paths))):\n",
"# last_pub_ts = None\n",
"# last_cb_end = None\n",
"# print(f\"=== {y}:\")\n",
"# for inst in e2e_path:\n",
"\n",
"#tree = sort_subtree(get_relevant_tree(tree, _leaf_filter))\n",
"\n",
"t_start = tree.head.timestamp\n",
"t_min = t_start - e2e_lat(tree)\n",
"t_e2e = t_start - t_min\n",
"\n",
"legend_entries = {}\n",
"\n",
"\n",
"def plot_subtree(subtree: DepTree, ax: plt.Axes, y_labels, y=0, next_cb_start=0):\n",
" height = fanout(subtree)\n",
" inst = subtree.head\n",
"\n",
" match inst:\n",
" case TrCallbackInstance(timestamp=t_cb, duration=d_cb):\n",
" is_sub = isinstance(inst.callback_obj.owner, TrSubscriptionObject)\n",
"\n",
" r_x = t_cb - t_start + margin_x / 2\n",
" r_y = y + margin_y / 2\n",
" r_w = max(d_cb - margin_x, 0)\n",
" r_h = height - margin_y\n",
"\n",
" r = mpatch.Rectangle((r_x, r_y), r_w, r_h,\n",
" ec=\"cadetblue\" if is_sub else \"indianred\", fc=\"lightblue\" if is_sub else \"lightcoral\",\n",
" zorder=9000)\n",
" ax.add_artist(r)\n",
"\n",
" text = repr(sanitize(\n",
" inst.callback_obj.callback_symbol.symbol)) if inst.callback_obj and inst.callback_obj.callback_symbol else \"??\"\n",
" text = trunc_chars(text, r_w, t_e2e)\n",
" if text:\n",
" ax.text(r_x + r_w / 2, r_y + r_h / 2, text, ha=\"center\", va=\"center\", backgroundcolor=(1, 1, 1, .5),\n",
" zorder=11000)\n",
"\n",
" if is_sub and \"Subscription CB\" not in legend_entries:\n",
" legend_entries[\"Subscription CB\"] = r\n",
" elif not is_sub and \"Timer CB\" not in legend_entries:\n",
" legend_entries[\"Timer CB\"] = r\n",
"\n",
" if next_cb_start is not None:\n",
" r_x = t_cb - t_start + d_cb - margin_x / 2\n",
" r_y = y + .5 - arr_width / 2\n",
" r_w = next_cb_start - (t_cb + d_cb) + margin_x\n",
" r_h = arr_width\n",
" r = mpatch.Rectangle((r_x, r_y), r_w, r_h, color=\"orange\")\n",
" ax.add_artist(r)\n",
"\n",
" if is_sub:\n",
" node = inst.callback_obj.owner.subscription.node\n",
" else:\n",
" node = inst.callback_obj.owner.node\n",
" text = node.path\n",
"\n",
" text = trunc_chars(text, r_w, t_e2e)\n",
" if text:\n",
" ax.text(r_x + r_w / 2, r_y + r_h / 2, text, ha=\"center\", va=\"center\", backgroundcolor=(1, 1, 1, .5),\n",
" zorder=11000)\n",
"\n",
" if \"Idle\" not in legend_entries:\n",
" legend_entries[\"Idle\"] = r\n",
"\n",
" next_cb_start = t_cb\n",
" case TrPublishInstance(timestamp=t_pub, publisher=pub):\n",
" if not subtree.deps:\n",
" y_labels.append(pub.topic_name if pub else None)\n",
"\n",
" scatter = ax.scatter(t_pub - t_start, y + .5, color=\"cyan\", marker=\".\", zorder=10000)\n",
"\n",
" if \"Publication\" not in legend_entries:\n",
" legend_entries[\"Publication\"] = scatter\n",
"\n",
" if next_cb_start is not None:\n",
" r_x = t_pub - t_start\n",
" r_y = y + .5 - arr_width / 2\n",
" r_w = max(next_cb_start - t_pub + margin_x / 2, 0)\n",
" r = mpatch.Rectangle((r_x, r_y), r_w, arr_width, color=\"lightgreen\")\n",
" ax.add_artist(r)\n",
" if pub:\n",
" text = pub.topic_name\n",
" text = trunc_chars(text, r_w, t_e2e)\n",
" if text:\n",
" ax.text(r_x + r_w / 2, r_y + arr_width / 2, text, ha=\"center\", va=\"center\",\n",
" backgroundcolor=(1, 1, 1, .5), zorder=11000)\n",
"\n",
" if \"DDS\" not in legend_entries:\n",
" legend_entries[\"DDS\"] = r\n",
"\n",
" topic_stats = msg_sizes.get(pub.topic_name) if BW_ENABLED else None\n",
" if topic_stats:\n",
" size_str = bw_str(topic_stats)\n",
" ax.text(r_x + r_w / 2, r_y + arr_width + margin_y, size_str, ha=\"center\",\n",
" backgroundcolor=(1, 1, 1, .5), zorder=11000)\n",
" else:\n",
" print(\"[WARN] Tried to publish to another PublishInstance\")\n",
" next_cb_start = None\n",
"\n",
" acc_fanout = 0\n",
" for dep in subtree.deps:\n",
" acc_fanout += plot_subtree(dep, ax, y_labels, y + acc_fanout, next_cb_start)\n",
" return height\n",
"\n",
"\n",
"fig, ax = plt.subplots(figsize=(36, 20), num=\"path_viz\")\n",
"ax.set_ylim(0, len(e2es))\n",
"\n",
"y_labels = []\n",
"plot_subtree(tree, ax, y_labels)\n",
"\n",
"tree_e2e = e2e_lat(tree)\n",
"plot_margin_x = .01 * tree_e2e\n",
"ax.set_xlim(-tree_e2e - plot_margin_x, plot_margin_x)\n",
"ax.set_yticks(np.array(range(len(y_labels))) + .5, y_labels)\n",
"ax.set_title(f\"Timestep {E2E_PLOT_TIMESTAMP}: {(tree.head.timestamp - relevant_trees[0].head.timestamp):10.6f}s\")\n",
"ax.set_xlabel(\"Time relative to output message [s]\")\n",
"ax.set_ylabel(\"Start topic\")\n",
"\n",
"labels, handles = list(zip(*legend_entries.items()))\n",
"ax.legend(handles, labels)\n",
"print(len(y_labels))"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
},
{
"cell_type": "markdown",
"source": [
"# Find Top Critical Paths\n",
"\n",
"For each message tree, find its critical path and runner-up candidates. Then, for the whole timeseries, find the top critical paths by occurence count.\n",
"Critical paths are abstracted by their E2E-latency and a list of passed CBs and topics.\n"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%% md\n"
}
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"\n",
"%%skip_if_false E2E_ENABLED\n",
"\n",
"critical_paths = {}\n",
"print(len(relevant_trees))\n",
"for tree in tqdm(relevant_trees):\n",
" crit = critical_path(tree)\n",
"\n",
"\n",
" def _owner(inst):\n",
" match inst:\n",
" case TrCallbackInstance(callback_obj=cb_obj):\n",
" cb_obj: TrCallbackObject\n",
" if cb_obj and cb_obj.callback_symbol:\n",
" sym = repr(sanitize(cb_obj.callback_symbol.symbol))\n",
" else:\n",
" sym = str(cb_obj.id)\n",
" return sym\n",
" case TrPublishInstance(publisher=pub):\n",
" pub: TrPublisher\n",
" topic = pub.topic_name\n",
" return topic\n",
" case _:\n",
" raise ValueError()\n",
"\n",
"\n",
" key = tuple(map(_owner, crit[::-1]))\n",
" if key not in critical_paths:\n",
" critical_paths[key] = []\n",
" critical_paths[key].append(crit)\n",
"\n",
"items = list(critical_paths.items())\n",
"items.sort(key=lambda pair: len(pair[1]), reverse=True)\n",
"\n",
"out_df = pd.DataFrame(columns=[\"path\", \"timestamp\", \"e2e_latency\"])\n",
"for key, paths in items:\n",
" path_records = [{\"path\": \" -> \".join(key),\n",
" \"timestamp\": p[0].timestamp,\n",
" \"e2e_latency\": p[0].timestamp - p[-1].timestamp}\n",
" for p in paths]\n",
" out_df = out_df.append(path_records)\n",
" print(\n",
" f\"======== {len(paths)}x: {sum(map(lambda p: p[0].timestamp - p[-1].timestamp, paths)) / len(paths) * 1000:.3f}ms\")\n",
" paths_durations = []\n",
" for path in paths:\n",
" next_inst = None\n",
" durations = []\n",
" for inst in path:\n",
" match inst:\n",
" case TrCallbackInstance(timestamp=t, duration=d):\n",
" if not next_inst:\n",
" durations.append(d)\n",
" else:\n",
" durations.append(min(d, next_inst.timestamp - t))\n",
" case TrPublishInstance(timestamp=t):\n",
" if not next_inst:\n",
" durations.append(0.0)\n",
" else:\n",
" durations.append(next_inst.timestamp - t)\n",
" case _:\n",
" raise ValueError()\n",
" next_inst = inst\n",
" paths_durations.append(durations)\n",
"\n",
" durations = list(map(lambda l: sum(l) / len(l), zip(*paths_durations)))[::-1]\n",
" assert len(durations) == len(key)\n",
" perc = np.percentile(durations, [70, 90, 95, 100])\n",
" colors = [\"green\", \"yellow\", \"red\", \"magenta\"]\n",
" for part, duration in zip(key, durations):\n",
" E2E_PLOT_TIMESTAMP = 0\n",
" for j, p in enumerate(perc):\n",
" if duration < p:\n",
" break\n",
" E2E_PLOT_TIMESTAMP = j\n",
" dur_str = colored(f\"{duration * 1000 :>.3f}ms\", colors[E2E_PLOT_TIMESTAMP])\n",
" print(f\" -> {dur_str} {part}\")\n",
"\n",
"out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\", index=False)\n"
],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
}
}
],
"metadata": {
"interpreter": {
"hash": "e7370f93d1d0cde622a1f8e1c04877d8463912d04d973331ad4851f04de6915a"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}