From 9848c39b6142a311401c7bd2f17c6c36dba62d26 Mon Sep 17 00:00:00 2001 From: Maximilian Schmeller Date: Thu, 15 Sep 2022 18:04:45 +0200 Subject: [PATCH] Kill me --- trace-analysis.ipynb | 265 +++++++++++++++++-------------------------- 1 file changed, 105 insertions(+), 160 deletions(-) diff --git a/trace-analysis.ipynb b/trace-analysis.ipynb index 748b1e0..ad85793 100644 --- a/trace-analysis.ipynb +++ b/trace-analysis.ipynb @@ -7,6 +7,18 @@ "source": [ "import os\n", "import sys\n", + "import re\n", + "import math\n", + "import graphviz as gv\n", + "from tqdm import tqdm\n", + "from bisect import bisect\n", + "from termcolor import colored\n", + "\n", + "import matplotlib.patches as mpatch\n", + "from scipy import stats\n", + "from cycler import cycler\n", + "\n", + "import glob\n", "\n", "import numpy as np\n", "import pandas as pd\n", @@ -14,7 +26,6 @@ "\n", "from misc.utils import cached, parse_as\n", "\n", - "%load_ext pyinstrument\n", "%matplotlib inline" ], "metadata": { @@ -50,14 +61,14 @@ "# Path to trace directory (e.g. ~/.ros/my-trace/ust) or to a converted trace file.\n", "# Using the path \"/ust\" at the end is optional but greatly reduces processing time\n", "# if kernel traces are also present.\n", - "TR_PATH = \"data/awsim-trace/ust\"\n", + "TR_PATH = \"/home/max/Projects/optimization_framework/artifacts/iteration0_worker1/aw_replay/tracing/scenario-trace/ust\"\n", "\n", "# Path to the folder all artifacts from this notebook are saved to.\n", "# This entails plots as well as data tables.\n", "OUT_PATH = \"out/\"\n", "\n", "# Whether to cache the results of long computations per set of inputs\n", - "CACHING_ENABLED = True\n", + "CACHING_ENABLED = False\n", "\n", "# Whether to annotate topics/publications with bandwidth/message size\n", "BW_ENABLED = False\n", @@ -89,7 +100,7 @@ "DFG_INPUT_NODE_PATTERNS = [r\"^/sensing\"]\n", "# RegEx pattern for nodes that shall be marked as system outputs\n", "# These will be plotted with a double border\n", - "DFG_OUTPUT_NODE_PATTERNS = [r\"^/awapi\", r\"^/control/external_cmd_converter\"]\n", + "DFG_OUTPUT_NODE_PATTERNS = [r\"^/awapi\", r\"^/control/external_cmd_converter\", \"emergency\"]\n", "# RegEx for nodes which shall not be plotted in the DFG\n", "DFG_EXCL_NODE_PATTERNS = [r\"^/rviz2\", r\"transform_listener_impl\"]\n", "\n", @@ -108,11 +119,10 @@ "\n", "# All topics containing any of these RegEx patterns are considered output topics in E2E latency calculations\n", "# E.g. r\"^/control/\" will cover all control topics\n", - "E2E_OUTPUT_TOPIC_PATTERNS = [r\"^/control/trajectory_follower/control_cmd\"]\n", + "E2E_OUTPUT_TOPIC_PATTERNS = [r\"^/control/\", r\"^/vehicle\"]\n", "# All topics containing any of these RegEx patterns are considered input topics in E2E latency calculations\n", "# E.g. r\"^/sensing/\" will cover all sensing topics\n", - "E2E_INPUT_TOPIC_PATTERNS = [\"/vehicle/status/\", \"/sensing/imu\"]\n", - "\n", + "E2E_INPUT_TOPIC_PATTERNS = [r\"^/vehicle/status/\", r\"^/sensing/imu\"]\n", "\n", "# This code overrides the above constants with environment variables, do not edit.\n", "for env_key, env_value in os.environ.items():\n", @@ -123,10 +133,12 @@ " value = parse_as(type(globals()[key]), env_value)\n", " globals()[key] = value\n", "\n", + "\n", "# Convert input paths to absolute paths\n", "def _expand_path(path):\n", " return os.path.realpath(os.path.expandvars(os.path.expanduser(path)))\n", "\n", + "\n", "TRACING_WS_BUILD_PATH = _expand_path(TRACING_WS_BUILD_PATH)\n", "TR_PATH = _expand_path(TR_PATH)\n", "OUT_PATH = _expand_path(OUT_PATH)\n", @@ -153,7 +165,9 @@ "from tracetools_analysis.loading import load_file\n", "from tracetools_analysis.processor.ros2 import Ros2Handler\n", "\n", - "from tracing_interop.tr_types import TrTimer, TrTopic, TrPublisher, TrPublishInstance, TrCallbackInstance, TrContext" + "from tracing_interop.tr_types import *\n", + "from latency_graph.message_tree import DepTree\n", + "from matching.subscriptions import sanitize" ], "metadata": { "collapsed": false, @@ -185,7 +199,7 @@ " return TrContext(handler)\n", "\n", "\n", - "_tracing_context = cached(\"tr_objects\", _load_traces, [TR_PATH], CACHING_ENABLED)\n", + "_tracing_context = cached(\"tr_objects\", _load_traces, [TR_PATH], not CACHING_ENABLED)\n", "_tr_globals = [\"nodes\", \"publishers\", \"subscriptions\", \"timers\", \"timer_node_links\", \"subscription_objects\",\n", " \"callback_objects\", \"callback_symbols\", \"publish_instances\", \"callback_instances\", \"topics\"]\n", "\n", @@ -223,10 +237,12 @@ "source": [ "from latency_graph import latency_graph as lg\n", "\n", + "\n", "def _make_latency_graph():\n", " return lg.LatencyGraph(_tracing_context)\n", "\n", - "lat_graph = cached(\"lat_graph\", _make_latency_graph, [TR_PATH], CACHING_ENABLED)" + "\n", + "lat_graph = cached(\"lat_graph\", _make_latency_graph, [TR_PATH], not CACHING_ENABLED)" ], "metadata": { "collapsed": false, @@ -243,8 +259,6 @@ "%%skip_if_false DFG_ENABLED\n", "%%skip_if_false DFG_PLOT\n", "\n", - "from tracing_interop.tr_types import TrNode, TrCallbackObject, TrSubscriptionObject\n", - "\n", "#################################################\n", "# Plot DFG\n", "#################################################\n", @@ -276,13 +290,13 @@ " 'pointcloud_container': 'sensing',\n", "}\n", "\n", - "import graphviz as gv\n", "\n", "g = gv.Digraph('G', filename=\"latency_graph.gv\",\n", " node_attr={'shape': 'plain'},\n", " graph_attr={'pack': '1'})\n", "g.graph_attr['rankdir'] = 'LR'\n", "\n", + "\n", "def plot_hierarchy(gv_parent, lg_node: lg.LGHierarchyLevel, **subgraph_kwargs):\n", " if lg_node.name == \"[NONE]\":\n", " return\n", @@ -311,6 +325,7 @@ " for ch in lg_node.children:\n", " plot_hierarchy(c, ch, **subgraph_kwargs)\n", "\n", + "\n", "def plot_lg(graph: lg.LatencyGraph):\n", " for top_level_node in graph.top_node.children:\n", " colors = node_colors[node_namespace_mapping.get(top_level_node.name)]\n", @@ -319,6 +334,7 @@ " for edge in graph.edges:\n", " g.edge(f\"{edge.start.id()}:out\", f\"{edge.end.id()}:in\")\n", "\n", + "\n", "plot_lg(lat_graph)\n", "\n", "g.save(\"latency_graph.gv\")\n", @@ -341,8 +357,6 @@ "%%skip_if_false DFG_ENABLED\n", "%%skip_if_false DFG_PLOT\n", "\n", - "import re\n", - "import math\n", "\n", "##################################################\n", "# Compute in/out topics for hierarchy level X\n", @@ -363,6 +377,7 @@ "\n", " return _traverse_node(lat_graph.top_node)\n", "\n", + "\n", "lvl_nodes = get_nodes_on_level(lat_graph)\n", "lvl_nodes = [n for n in lvl_nodes if not any(re.search(p, n.full_name) for p in DFG_EXCL_NODE_PATTERNS)]\n", "\n", @@ -373,6 +388,7 @@ "print(', '.join(map(lambda n: n, output_nodes)))\n", "print(', '.join(map(lambda n: n.full_name, lvl_nodes)))\n", "\n", + "\n", "def _collect_callbacks(n: lg.LGHierarchyLevel):\n", " callbacks = []\n", " callbacks += n.callbacks\n", @@ -387,7 +403,6 @@ " for cb in cbs:\n", " cb_to_node_map[cb.id()] = n\n", "\n", - "\n", "edges_between_nodes = {}\n", "for edge in lat_graph.edges:\n", " from_node = cb_to_node_map.get(edge.start.id())\n", @@ -422,6 +437,7 @@ " g.node(helper_node_name, label=\"\", shape=\"none\", height=\"0\", width=\"0\")\n", " g.edge(helper_node_name, n.full_name)\n", "\n", + "\n", "def compute_e2e_paths(start_nodes, end_nodes, edges):\n", " frontier_paths = [[n] for n in start_nodes]\n", " final_paths = []\n", @@ -446,6 +462,7 @@ " for path in final_paths]\n", " return final_paths\n", "\n", + "\n", "e2e_paths = compute_e2e_paths(input_nodes, output_nodes, edges_between_nodes)\n", "\n", "for (src_name, dst_name), edges in edges_between_nodes.items():\n", @@ -472,9 +489,6 @@ "source": [ "%%skip_if_false E2E_ENABLED\n", "\n", - "from tqdm import tqdm\n", - "from bisect import bisect\n", - "\n", "\n", "def inst_get_dep_msg(inst: TrCallbackInstance):\n", " if inst.callback_object not in _tracing_context.callback_objects.by_callback_object:\n", @@ -512,6 +526,7 @@ " # print(f\"No messages found for topic {sub_obj.subscription.topic}\")\n", " return None\n", "\n", + "\n", "def inst_get_dep_insts(inst: TrCallbackInstance):\n", " if inst.callback_object not in _tracing_context.callback_objects.by_callback_object:\n", " # print(\"Callback not found\")\n", @@ -531,6 +546,7 @@ " insts = [inst for inst in insts if inst is not None]\n", " return insts\n", "\n", + "\n", "def get_cb_dep_cbs(cb: TrCallbackObject):\n", " match cb.owner:\n", " case TrSubscriptionObject() as sub_obj:\n", @@ -591,23 +607,24 @@ " return dep_inst\n", "\n", "\n", - "def get_dep_tree(inst: TrPublishInstance | TrCallbackInstance, lvl=0, visited_topics=None, is_dep_cb=False, start_time=None):\n", + "def get_dep_tree(inst: TrPublishInstance | TrCallbackInstance, lvl=0, visited_topics=None, is_dep_cb=False,\n", + " start_time=None):\n", " if visited_topics is None:\n", " visited_topics = set()\n", - " \n", + "\n", " if start_time is None:\n", " start_time = inst.timestamp\n", - " \n", + "\n", " if inst.timestamp - start_time > E2E_TIME_LIMIT_S:\n", " return None\n", - " \n", + "\n", " children_are_dep_cbs = False\n", "\n", " match inst:\n", " case TrPublishInstance(publisher=pub):\n", " if pub.topic_name in visited_topics:\n", " return None\n", - " \n", + "\n", " visited_topics.add(pub.topic_name)\n", " deps = [get_msg_dep_cb(inst)]\n", " case TrCallbackInstance() as cb_inst:\n", @@ -616,7 +633,8 @@ " deps += inst_get_dep_insts(cb_inst)\n", " children_are_dep_cbs = True\n", " case _:\n", - " print(f\"[WARN] Expected inst to be of type TrPublishInstance or TrCallbackInstance, got {type(inst).__name__}\")\n", + " print(\n", + " f\"[WARN] Expected inst to be of type TrPublishInstance or TrCallbackInstance, got {type(inst).__name__}\")\n", " return None\n", " #print(\"Rec level\", lvl)\n", " deps = [dep for dep in deps if dep is not None]\n", @@ -640,21 +658,26 @@ "\n", "end_topics = [t for t in _tracing_context.topics if any(re.search(f, t.name) for f in E2E_OUTPUT_TOPIC_PATTERNS)]\n", "\n", + "\n", "def build_dep_trees():\n", " all_trees = []\n", " for end_topic in end_topics:\n", " end_topic: TrTopic\n", + " print(f\"====={end_topic.name}\")\n", "\n", " pubs = end_topic.publishers\n", + " print(len(pubs))\n", " for pub in pubs:\n", " msgs = pub.instances\n", + " print(len(msgs))\n", " for msg in tqdm(msgs, desc=f\"Building message chains for topic {end_topic.name}\"):\n", " msg: TrPublishInstance\n", " tree = get_dep_tree(msg)\n", " all_trees.append(tree)\n", " return all_trees\n", "\n", - "trees = cached(\"trees\", build_dep_trees, [TR_PATH], CACHING_ENABLED)" + "\n", + "trees = cached(\"trees\", build_dep_trees, [TR_PATH], not CACHING_ENABLED)" ], "metadata": { "collapsed": false, @@ -671,7 +694,6 @@ "%%skip_if_false E2E_ENABLED\n", "%%skip_if_false BW_ENABLED\n", "\n", - "import glob\n", "\n", "\n", "def parse_bytes(string):\n", @@ -688,10 +710,10 @@ "\n", "\n", "def bytes_str(bytes):\n", - " if bytes >= 1024**2:\n", - " return f\"{bytes/(1024**2):.2f} MiB\"\n", + " if bytes >= 1024 ** 2:\n", + " return f\"{bytes / (1024 ** 2):.2f} MiB\"\n", " if bytes >= 1024:\n", - " return f\"{bytes/1024:.2f} KiB\"\n", + " return f\"{bytes / 1024:.2f} KiB\"\n", " return f\"{bytes:.0f} B\"\n", "\n", "\n", @@ -706,7 +728,8 @@ " #print(f\"No data for {topic}\")\n", " continue\n", "\n", - " line_pattern = re.compile(r\"(?P[0-9.]+ [KM]?)B/s from (?P[0-9.]+) messages --- Message size mean: (?P[0-9.]+ [KM]?)B min: (?P[0-9.]+ [KM]?)B max: (?P[0-9.]+ [KM]?)B\\n\")\n", + " line_pattern = re.compile(\n", + " r\"(?P[0-9.]+ [KM]?)B/s from (?P[0-9.]+) messages --- Message size mean: (?P[0-9.]+ [KM]?)B min: (?P[0-9.]+ [KM]?)B max: (?P[0-9.]+ [KM]?)B\\n\")\n", " m = re.fullmatch(line_pattern, lines[-1])\n", " if m is None:\n", " print(f\"Line could not be parsed in {topic}: '{lines[-1]}'\")\n", @@ -731,9 +754,6 @@ "source": [ "%%skip_if_false E2E_ENABLED\n", "\n", - "\n", - "from latency_graph.message_tree import DepTree\n", - "\n", "def leaf_topics(tree: DepTree, lvl=0):\n", " ret_list = []\n", " match tree.head:\n", @@ -743,7 +763,7 @@ " ret_list += [(lvl, None)]\n", "\n", " for dep in tree.deps:\n", - " ret_list += leaf_topics(dep, lvl+1)\n", + " ret_list += leaf_topics(dep, lvl + 1)\n", " return ret_list\n", "\n", "\n", @@ -777,7 +797,7 @@ " match tree.head:\n", " case TrPublishInstance(publisher=pub):\n", " if pub and any(re.search(f, pub.topic_name) for f in input_topic_patterns):\n", - " return [(latency,new_path)]\n", + " return [(latency, new_path)]\n", "\n", " ret_list = []\n", " for dep in tree.deps:\n", @@ -860,7 +880,7 @@ "\n", "\n", "fig, ax = plt.subplots(figsize=(18, 9), num=\"e2e_plot\")\n", - "DS=1\n", + "DS = 1\n", "times = [tree.head.timestamp - trees[0].head.timestamp for tree in trees[::DS]]\n", "\n", "ax2 = ax.twinx()\n", @@ -873,16 +893,19 @@ "ax.plot(times, [np.mean(e2es) for e2es in e2ess[::DS]])\n", "ax.fill_between(times, [np.min(e2es) for e2es in e2ess[::DS]], [np.max(e2es) for e2es in e2ess[::DS]], alpha=.3)\n", "\n", + "\n", "def scatter_topic(topic_name, y=0, **scatter_kwargs):\n", " for pub in topics.by_name[topic_name].publishers:\n", " if not pub:\n", " continue\n", "\n", - " inst_timestamps = [inst.timestamp - trees[0].head.timestamp for inst in pub.instances if inst.timestamp >= trees[0].head.timestamp]\n", + " inst_timestamps = [inst.timestamp - trees[0].head.timestamp for inst in pub.instances if\n", + " inst.timestamp >= trees[0].head.timestamp]\n", " scatter_kwargs_default = {\"marker\": \"x\", \"color\": \"indianred\"}\n", " scatter_kwargs_default.update(scatter_kwargs)\n", " ax.scatter(inst_timestamps, np.full(len(inst_timestamps), fill_value=y), **scatter_kwargs_default)\n", "\n", + "\n", "scatter_topic(\"/autoware/engage\")\n", "scatter_topic(\"/planning/scenario_planning/parking/trajectory\", y=-.04, color=\"cadetblue\")\n", "scatter_topic(\"/planning/scenario_planning/lane_driving/trajectory\", y=-.08, color=\"darkgreen\")\n", @@ -944,6 +967,7 @@ " sort_subtree(dep, sort_func)\n", " return subtree\n", "\n", + "\n", "def _leaf_filter(inst, root):\n", " if root.timestamp - inst.timestamp > E2E_TIME_LIMIT_S:\n", " return False\n", @@ -972,7 +996,8 @@ "%%skip_if_false E2E_ENABLED\n", "%%skip_if_false E2E_PLOT\n", "\n", - "from cycler import cycler\n", + "\n", + "\n", "\n", "def dict_safe_append(dictionary, key, value):\n", " if key not in dictionary:\n", @@ -981,8 +1006,8 @@ "\n", "\n", "fig, (ax, ax_rel) = plt.subplots(2, 1, sharex=True, figsize=(60, 30), num=\"crit_plot\")\n", - "ax.set_prop_cycle(cycler('color', [plt.cm.nipy_spectral(i/4) for i in range(5)]))\n", - "ax_rel.set_prop_cycle(cycler('color', [plt.cm.nipy_spectral(i/4) for i in range(5)]))\n", + "ax.set_prop_cycle(cycler('color', [plt.cm.nipy_spectral(i / 4) for i in range(5)]))\n", + "ax_rel.set_prop_cycle(cycler('color', [plt.cm.nipy_spectral(i / 4) for i in range(5)]))\n", "\n", "critical_paths = [critical_path(tree) for tree in relevant_trees[::DS]]\n", "\n", @@ -1010,7 +1035,7 @@ " last_cb_time = None\n", " case TrCallbackInstance(callback_obj=cb, timestamp=t, duration=d):\n", " if last_pub_time is not None:\n", - " assert last_pub_time <= t+d, \"Publication out of CB instance timeframe\"\n", + " assert last_pub_time <= t + d, \"Publication out of CB instance timeframe\"\n", "\n", " match cb.owner:\n", " case TrTimer():\n", @@ -1031,7 +1056,9 @@ "\n", "time_breakdown = {k: np.array(v) for k, v in time_breakdown.items()}\n", "\n", - "timer_cb_times = [sum(inst.duration for inst in path if isinstance(inst, TrCallbackInstance) and isinstance(inst.callback_obj, TrTimer)) for path in critical_paths]\n", + "timer_cb_times = [sum(inst.duration for inst in path if\n", + " isinstance(inst, TrCallbackInstance) and isinstance(inst.callback_obj, TrTimer)) for path in\n", + " critical_paths]\n", "sub_cb_times = [sum(inst.duration for inst in path if isinstance(inst, TrCallbackInstance)) for path in critical_paths]\n", "\n", "labels, values = list(zip(*time_breakdown.items()))\n", @@ -1066,10 +1093,8 @@ "%%skip_if_false E2E_ENABLED\n", "%%skip_if_false E2E_PLOT\n", "\n", - "from scipy import stats\n", - "\n", "fig, ax = plt.subplots(figsize=(60, 15), num=\"crit_pdf\")\n", - "ax.set_prop_cycle(cycler('color', [plt.cm.nipy_spectral(i/4) for i in range(5)]))\n", + "ax.set_prop_cycle(cycler('color', [plt.cm.nipy_spectral(i / 4) for i in range(5)]))\n", "\n", "kde = stats.gaussian_kde(timestep_mags)\n", "xs = np.linspace(timestep_mags.min(), timestep_mags.max(), 1000)\n", @@ -1105,22 +1130,19 @@ "%%skip_if_false E2E_ENABLED\n", "%%skip_if_false E2E_PLOT\n", "\n", - "from tracing_interop.tr_types import TrSubscription\n", - "import matplotlib.patches as mpatch\n", - "import math\n", - "\n", "tree = trees[E2E_PLOT_TIMESTAMP]\n", "e2es = e2ess[E2E_PLOT_TIMESTAMP]\n", "e2e_paths = e2e_pathss[E2E_PLOT_TIMESTAMP]\n", "margin_y = .2\n", - "margin_x=0\n", - "arr_width= 1 - margin_y\n", + "margin_x = 0\n", + "arr_width = 1 - margin_y\n", + "\n", "\n", "def cb_str(inst: TrCallbackInstance):\n", " cb: TrCallbackObject = inst.callback_obj\n", " if not cb:\n", " return None\n", - " ret_str = f\"- {inst.duration*1e3:07.3f}ms \"\n", + " ret_str = f\"- {inst.duration * 1e3:07.3f}ms \"\n", " #if cb.callback_symbol:\n", " # ret_str = repr(sanitize(cb.callback_symbol.symbol))\n", "\n", @@ -1131,16 +1153,18 @@ " case TrTimer(period=p, node=node):\n", " p: int\n", " node: TrNode\n", - " ret_str = f\"{ret_str}{node.path if node else None} <- @{1/(p*1e-9):.2f}Hz\"\n", + " ret_str = f\"{ret_str}{node.path if node else None} <- @{1 / (p * 1e-9):.2f}Hz\"\n", " return ret_str\n", "\n", "\n", "def ts_str(inst, prev):\n", - " return f\"{(inst.timestamp - prev)*1e3:+09.3f}ms\"\n", + " return f\"{(inst.timestamp - prev) * 1e3:+09.3f}ms\"\n", + "\n", "\n", "def bw_str(bw):\n", " return bytes_str(bw['mean'])\n", "\n", + "\n", "def trunc_chars(string, w, e2e):\n", " if w < e2e * .005:\n", " return \"\"\n", @@ -1150,6 +1174,7 @@ "\n", " return \"...\" + string[-n_chars:] if n_chars < len(string) else string\n", "\n", + "\n", "#e2e_paths = sorted(e2e_paths, key=lambda path: path[-1].timestamp - path[0].timestamp, reverse=True)\n", "#for y, e2e_path in reversed(list(enumerate(e2e_paths))):\n", "# last_pub_ts = None\n", @@ -1165,6 +1190,7 @@ "\n", "legend_entries = {}\n", "\n", + "\n", "def plot_subtree(subtree: DepTree, ax: plt.Axes, y_labels, y=0, next_cb_start=0):\n", " height = fanout(subtree)\n", " inst = subtree.head\n", @@ -1179,13 +1205,16 @@ " r_h = height - margin_y\n", "\n", " r = mpatch.Rectangle((r_x, r_y), r_w, r_h,\n", - " ec=\"cadetblue\" if is_sub else \"indianred\", fc=\"lightblue\" if is_sub else \"lightcoral\", zorder=9000)\n", + " ec=\"cadetblue\" if is_sub else \"indianred\", fc=\"lightblue\" if is_sub else \"lightcoral\",\n", + " zorder=9000)\n", " ax.add_artist(r)\n", "\n", - " text = repr(sanitize(inst.callback_obj.callback_symbol.symbol)) if inst.callback_obj and inst.callback_obj.callback_symbol else \"??\"\n", + " text = repr(sanitize(\n", + " inst.callback_obj.callback_symbol.symbol)) if inst.callback_obj and inst.callback_obj.callback_symbol else \"??\"\n", " text = trunc_chars(text, r_w, t_e2e)\n", " if text:\n", - " ax.text(r_x + r_w / 2, r_y + r_h / 2, text, ha=\"center\", va=\"center\", backgroundcolor=(1,1,1,.5), zorder=11000)\n", + " ax.text(r_x + r_w / 2, r_y + r_h / 2, text, ha=\"center\", va=\"center\", backgroundcolor=(1, 1, 1, .5),\n", + " zorder=11000)\n", "\n", " if is_sub and \"Subscription CB\" not in legend_entries:\n", " legend_entries[\"Subscription CB\"] = r\n", @@ -1194,7 +1223,7 @@ "\n", " if next_cb_start is not None:\n", " r_x = t_cb - t_start + d_cb - margin_x / 2\n", - " r_y = y + .5 - arr_width/2\n", + " r_y = y + .5 - arr_width / 2\n", " r_w = next_cb_start - (t_cb + d_cb) + margin_x\n", " r_h = arr_width\n", " r = mpatch.Rectangle((r_x, r_y), r_w, r_h, color=\"orange\")\n", @@ -1208,7 +1237,8 @@ "\n", " text = trunc_chars(text, r_w, t_e2e)\n", " if text:\n", - " ax.text(r_x + r_w / 2, r_y + r_h / 2, text, ha=\"center\", va=\"center\", backgroundcolor=(1,1,1,.5), zorder=11000)\n", + " ax.text(r_x + r_w / 2, r_y + r_h / 2, text, ha=\"center\", va=\"center\", backgroundcolor=(1, 1, 1, .5),\n", + " zorder=11000)\n", "\n", " if \"Idle\" not in legend_entries:\n", " legend_entries[\"Idle\"] = r\n", @@ -1218,14 +1248,14 @@ " if not subtree.deps:\n", " y_labels.append(pub.topic_name if pub else None)\n", "\n", - " scatter = ax.scatter(t_pub - t_start, y+.5, color=\"cyan\", marker=\".\", zorder=10000)\n", + " scatter = ax.scatter(t_pub - t_start, y + .5, color=\"cyan\", marker=\".\", zorder=10000)\n", "\n", " if \"Publication\" not in legend_entries:\n", " legend_entries[\"Publication\"] = scatter\n", "\n", " if next_cb_start is not None:\n", " r_x = t_pub - t_start\n", - " r_y = y + .5 - arr_width/2\n", + " r_y = y + .5 - arr_width / 2\n", " r_w = max(next_cb_start - t_pub + margin_x / 2, 0)\n", " r = mpatch.Rectangle((r_x, r_y), r_w, arr_width, color=\"lightgreen\")\n", " ax.add_artist(r)\n", @@ -1233,7 +1263,8 @@ " text = pub.topic_name\n", " text = trunc_chars(text, r_w, t_e2e)\n", " if text:\n", - " ax.text(r_x + r_w / 2, r_y + arr_width / 2, text, ha=\"center\", va=\"center\", backgroundcolor=(1,1,1,.5), zorder=11000)\n", + " ax.text(r_x + r_w / 2, r_y + arr_width / 2, text, ha=\"center\", va=\"center\",\n", + " backgroundcolor=(1, 1, 1, .5), zorder=11000)\n", "\n", " if \"DDS\" not in legend_entries:\n", " legend_entries[\"DDS\"] = r\n", @@ -1241,7 +1272,8 @@ " topic_stats = msg_sizes.get(pub.topic_name) if BW_ENABLED else None\n", " if topic_stats:\n", " size_str = bw_str(topic_stats)\n", - " ax.text(r_x + r_w / 2, r_y + arr_width + margin_y, size_str, ha=\"center\", backgroundcolor=(1,1,1,.5), zorder=11000)\n", + " ax.text(r_x + r_w / 2, r_y + arr_width + margin_y, size_str, ha=\"center\",\n", + " backgroundcolor=(1, 1, 1, .5), zorder=11000)\n", " else:\n", " print(\"[WARN] Tried to publish to another PublishInstance\")\n", " next_cb_start = None\n", @@ -1252,7 +1284,6 @@ " return height\n", "\n", "\n", - "\n", "fig, ax = plt.subplots(figsize=(36, 20), num=\"path_viz\")\n", "ax.set_ylim(0, len(e2es))\n", "\n", @@ -1300,14 +1331,12 @@ "source": [ "%%skip_if_false E2E_ENABLED\n", "\n", - "from termcolor import colored\n", - "from tqdm import tqdm\n", - "\n", "critical_paths = {}\n", "print(len(relevant_trees))\n", "for tree in tqdm(trees):\n", " crit = critical_path(tree)\n", "\n", + "\n", " def _owner(inst):\n", " match inst:\n", " case TrCallbackInstance(callback_obj=cb_obj):\n", @@ -1324,6 +1353,7 @@ " case _:\n", " raise ValueError()\n", "\n", + "\n", " key = tuple(map(_owner, crit[::-1]))\n", " if key not in critical_paths:\n", " critical_paths[key] = []\n", @@ -1336,7 +1366,8 @@ "for key, paths in items:\n", " path_records = [(\" -> \".join(key), p[0].timestamp, p[0].timestamp - p[-1].timestamp) for p in paths]\n", " out_df.append(path_records)\n", - " print(f\"======== {len(paths)}x: {sum(map(lambda p: p[0].timestamp - p[-1].timestamp, paths))/len(paths)*1000:.3f}ms\")\n", + " print(\n", + " f\"======== {len(paths)}x: {sum(map(lambda p: p[0].timestamp - p[-1].timestamp, paths)) / len(paths) * 1000:.3f}ms\")\n", " paths_durations = []\n", " for path in paths:\n", " next_inst = None\n", @@ -1371,7 +1402,7 @@ " dur_str = colored(f\"{duration * 1000 :>.3f}ms\", colors[E2E_PLOT_TIMESTAMP])\n", " print(f\" -> {dur_str} {part}\")\n", "\n", - "out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\")" + "out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\", index=False)" ], "metadata": { "collapsed": false, @@ -1383,100 +1414,14 @@ { "cell_type": "code", "execution_count": null, + "outputs": [], + "source": [], "metadata": { + "collapsed": false, "pycharm": { "name": "#%%\n" } - }, - "outputs": [], - "source": [ - "%%skip_if_false E2E_ENABLED\n", - "\n", - "from termcolor import colored\n", - "from tqdm import tqdm\n", - "from matching.subscriptions import sanitize\n", - "\n", - "critical_paths = {}\n", - "print(len(relevant_trees))\n", - "for tree in tqdm(trees):\n", - " crit = critical_path(tree)\n", - "\n", - " def _owner(inst):\n", - " match inst:\n", - " case TrCallbackInstance(callback_obj=cb_obj):\n", - " cb_obj: TrCallbackObject\n", - " if cb_obj and cb_obj.callback_symbol:\n", - " sym = repr(sanitize(cb_obj.callback_symbol.symbol))\n", - " else:\n", - " sym = str(cb_obj.id)\n", - " return sym\n", - " case TrPublishInstance(publisher=pub):\n", - " pub: TrPublisher\n", - " topic = pub.topic_name\n", - " return topic\n", - " case _:\n", - " raise ValueError()\n", - "\n", - " key = tuple(map(_owner, crit[::-1]))\n", - " if key not in critical_paths:\n", - " critical_paths[key] = []\n", - " critical_paths[key].append(crit)\n", - "\n", - "items = list(critical_paths.items())\n", - "items.sort(key=lambda pair: len(pair[1]), reverse=True)\n", - "\n", - "out_df = pd.DataFrame(columns=[\"path\", \"timestamp\", \"e2e_latency\"])\n", - "for key, paths in items:\n", - " path_records = [(\" -> \".join(key), p[0].timestamp, p[0].timestamp - p[-1].timestamp) for p in paths]\n", - " out_df.append(path_records)\n", - " print(f\"======== {len(paths)}x: {sum(map(lambda p: p[0].timestamp - p[-1].timestamp, paths))/len(paths)*1000:.3f}ms\")\n", - " paths_durations = []\n", - " for path in paths:\n", - " next_inst = None\n", - " durations = []\n", - " for inst in path:\n", - " match inst:\n", - " case TrCallbackInstance(timestamp=t, duration=d):\n", - " if not next_inst:\n", - " durations.append(d)\n", - " else:\n", - " durations.append(min(d, next_inst.timestamp - t))\n", - " case TrPublishInstance(timestamp=t):\n", - " if not next_inst:\n", - " durations.append(0.0)\n", - " else:\n", - " durations.append(next_inst.timestamp - t)\n", - " case _:\n", - " raise ValueError()\n", - " next_inst = inst\n", - " paths_durations.append(durations)\n", - "\n", - " durations = list(map(lambda l: sum(l) / len(l), zip(*paths_durations)))[::-1]\n", - " assert len(durations) == len(key)\n", - " perc = np.percentile(durations, [70, 90, 95, 100])\n", - " colors = [\"green\", \"yellow\", \"red\", \"magenta\"]\n", - " for part, duration in zip(key, durations):\n", - " E2E_PLOT_TIMESTAMP = 0\n", - " for j, p in enumerate(perc):\n", - " if duration < p:\n", - " break\n", - " E2E_PLOT_TIMESTAMP = j\n", - " dur_str = colored(f\"{duration * 1000 :>.3f}ms\", colors[E2E_PLOT_TIMESTAMP])\n", - " print(f\" -> {dur_str} {part}\")\n", - "\n", - "out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [], - "source": [] + } } ], "metadata": {