From 130c99e56f76b4ffb4e3ec1f909b99b7c3f6dbad Mon Sep 17 00:00:00 2001 From: Maximilian Schmeller Date: Fri, 7 Oct 2022 15:35:34 +0900 Subject: [PATCH] Refactorings, better console output. * Removed spam-like warnings (mostly replaced by printing a single warning counter) * Removed inst_runtime_interval() from latency_graph.py as it was out of that file's responsibilities. Moved into TrCallbackInstance directly * Fixed coloring of latency graph (planning was misspelled) --- latency_graph/latency_graph.py | 37 +++++----- trace-analysis.ipynb | 121 ++++++++++++++++++--------------- tracing_interop/tr_types.py | 23 +++++-- 3 files changed, 106 insertions(+), 75 deletions(-) diff --git a/latency_graph/latency_graph.py b/latency_graph/latency_graph.py index 642a762..e2d3d6c 100644 --- a/latency_graph/latency_graph.py +++ b/latency_graph/latency_graph.py @@ -1,8 +1,10 @@ from bisect import bisect_left, bisect_right +from collections import defaultdict from dataclasses import dataclass from itertools import combinations from multiprocessing import Pool from typing import Optional, Set, List, Iterable, Dict, Tuple +from functools import cache import numpy as np from tqdm import tqdm @@ -15,20 +17,15 @@ from tracing_interop.tr_types import TrContext, TrCallbackObject, TrCallbackSymb TOPIC_FILTERS = ["/parameter_events", "/tf_static", "/robot_description", "diagnostics", "/rosout"] +@cache def _get_cb_owner_node(cb: TrCallbackObject) -> TrNode | None: match cb.owner: case TrTimer(node=node): - owner_node = node + return node case TrSubscriptionObject(subscription=sub): - owner_node = sub.node + return sub.node case _: - owner_node = None - - if not owner_node: - print("[WARN] CB has no owners") - return None - - return owner_node + return None def _hierarchize(lg_nodes: Iterable['LGHierarchyLevel']): @@ -54,12 +51,6 @@ def _hierarchize(lg_nodes: Iterable['LGHierarchyLevel']): return base -def inst_runtime_interval(cb_inst: TrCallbackInstance): - start_time = cb_inst.timestamp - end_time = start_time + cb_inst.duration - return start_time, end_time - - def _get_publishing_cbs(cbs: Set[TrCallbackObject], pub: TrPublisher): """ Counts number of publication instances that lie within one of the cb_intervals. @@ -68,7 +59,7 @@ def _get_publishing_cbs(cbs: Set[TrCallbackObject], pub: TrPublisher): pub_cb_overlaps = {i: set() for i in range(len(pub_insts))} for cb in cbs: - cb_intervals = map(inst_runtime_interval, cb.callback_instances) + cb_intervals = map(lambda inst: (inst.t_start, inst.t_end), cb.callback_instances) for t_start, t_end in cb_intervals: i_overlap_begin = bisect_left(pub_insts, t_start, key=lambda x: x.timestamp) i_overlap_end = bisect_right(pub_insts, t_end, key=lambda x: x.timestamp) @@ -77,14 +68,20 @@ def _get_publishing_cbs(cbs: Set[TrCallbackObject], pub: TrPublisher): pub_cbs = set() cb_cb_overlaps = set() + cb_less_pubs = defaultdict(lambda: 0) for i, i_cbs in pub_cb_overlaps.items(): if not i_cbs: - print(f"[WARN] Publication on {pub.topic_name} without corresponding callback!") + cb_less_pubs[pub.topic_name] += 1 elif len(i_cbs) == 1: pub_cbs.update(i_cbs) else: # Multiple CBs in i_cbs cb_cb_overlaps.update(iter(combinations(i_cbs, 2))) + if cb_less_pubs: + print(f"[ENTKÄFERN] There are {len(cb_less_pubs)} topics which have publications from untracked callbacks:") + for topic_name, count in cb_less_pubs.items(): + print(f"--{pub.topic_name:.<120s}: {count:>20d} ownerless publications") + for cb1, cb2 in cb_cb_overlaps: cb1_subset_of_cb2 = True cb2_subset_of_cb1 = True @@ -213,13 +210,19 @@ class LatencyGraph: # Note that nodes can also be None! nodes_to_cbs = {} + ownerless_cbs = 0 for cb in tqdm(tr.callback_objects, desc="Finding CB nodes"): node = _get_cb_owner_node(cb) + if node is None: + ownerless_cbs += 1 + continue if node not in nodes_to_cbs: nodes_to_cbs[node] = set() nodes_to_cbs[node].add(cb) + print(f"[ENTKÄFERN] {ownerless_cbs} callbacks have no owner, filtering them out.") + ################################################## # Find in/out topics for each callback ################################################## diff --git a/trace-analysis.ipynb b/trace-analysis.ipynb index 392a637..df1359e 100644 --- a/trace-analysis.ipynb +++ b/trace-analysis.ipynb @@ -26,6 +26,8 @@ "\n", "from misc.utils import cached, parse_as\n", "\n", + "plt.rcParams['figure.dpi'] = 300\n", + "\n", "%matplotlib inline" ], "metadata": { @@ -58,6 +60,7 @@ "# Path to trace directory (e.g. ~/.ros/my-trace/ust) or to a converted trace file.\n", "# Using the path \"/ust\" at the end is optional but greatly reduces processing time\n", "# if kernel traces are also present.\n", + "# TR_PATH = \"/home/max/Downloads/iteration2_worker1/aw_replay/tracing/scenario-trace/ust\"\n", "TR_PATH = \"data/trace-awsim-x86/ust\"\n", "\n", "# Path to the folder all artifacts from this notebook are saved to.\n", @@ -83,9 +86,9 @@ "\n", "# Whether to compute data flow graphs.\n", "# If you are only interested in E2E latencies, set this to False\n", - "DFG_ENABLED = False\n", + "DFG_ENABLED = True\n", "# Whether to plot data flow graphs (ignored if DFG_ENABLED is False)\n", - "DFG_PLOT = False\n", + "DFG_PLOT = True\n", "\n", "# The maximum node namespace hierarchy level to be plotted.\n", "# Top-level (1): e.g. /sensing, /control, etc.\n", @@ -116,10 +119,10 @@ "\n", "# All topics containing any of these RegEx patterns are considered output topics in E2E latency calculations\n", "# E.g. r\"^/control/\" will cover all control topics\n", - "E2E_OUTPUT_TOPIC_PATTERNS = [r\"^/control/.*?/control_cmd\"]\n", + "E2E_OUTPUT_TOPIC_PATTERNS = [r\"emergency/control_cmd\"]\n", "# All topics containing any of these RegEx patterns are considered input topics in E2E latency calculations\n", "# E.g. r\"^/sensing/\" will cover all sensing topics\n", - "E2E_INPUT_TOPIC_PATTERNS = [r\"^/vehicle/status/\", r\"^/sensing/\"]\n", + "E2E_INPUT_TOPIC_PATTERNS = [r\"^/vehicle/status/\", r\"^/sensing/(lidar/[^c]|[^l])\"]\n", "\n", "# E2E paths are uniquely identified by a string like \"/topic/1 -> void(Node1)(args1) -> /topic/2 -> void(Node2)(args2) -> void(Node2)(args3) -> ...\".\n", "# Certain patterns only occur in initial setup or in scenario switching and can be excluded via RegEx patterns here.\n", @@ -192,7 +195,6 @@ "execution_count": null, "outputs": [], "source": [ - "\n", "def _load_traces():\n", " file = load_file(TR_PATH)\n", " handler = Ros2Handler.process(file)\n", @@ -229,13 +231,13 @@ "for t in sorted(topics, key=lambda t: t.name):\n", " for f in E2E_INPUT_TOPIC_PATTERNS:\n", " if re.search(f, t.name):\n", - " print(f\"--[ENTKÄFERN] {f} :ß: {t.name}\")\n", + " print(f\"--[ENTKÄFERN] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers))}\")\n", "\n", "print(\"\\n[ENTKÄFERN] OUTPUT TOPICS\")\n", "for t in sorted(topics, key=lambda t: t.name):\n", " for f in E2E_OUTPUT_TOPIC_PATTERNS:\n", " if re.search(f, t.name):\n", - " print(f\"--[ENTKÄFERN] {f} :ß: {t.name}\")" + " print(f\"--[ENTKÄFERN] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers))}\")" ], "metadata": { "collapsed": false @@ -255,7 +257,6 @@ "execution_count": null, "outputs": [], "source": [ - "\n", "from latency_graph import latency_graph as lg\n", "\n", "\n", @@ -274,7 +275,6 @@ "execution_count": null, "outputs": [], "source": [ - "\n", "%%skip_if_false DFG_ENABLED\n", "%%skip_if_false DFG_PLOT\n", "\n", @@ -297,11 +297,11 @@ "node_namespace_mapping = {\n", " 'perception': 'perception',\n", " 'sensing': 'sensing',\n", - " 'plannitokenizeng': 'planning',\n", + " 'planning': 'planning',\n", " 'control': 'control',\n", - " 'awapi': None,\n", - " 'autoware_api': None,\n", - " 'map': None,\n", + " 'awapi': 'system',\n", + " 'autoware_api': 'system',\n", + " 'map': 'system',\n", " 'system': 'system',\n", " 'localization': 'localization',\n", " 'robot_state_publisher': None,\n", @@ -370,7 +370,6 @@ "execution_count": null, "outputs": [], "source": [ - "\n", "%%skip_if_false DFG_ENABLED\n", "%%skip_if_false DFG_PLOT\n", "\n", @@ -384,7 +383,7 @@ " if cur_lvl == DFG_MAX_HIER_LEVEL:\n", " return [node]\n", "\n", - " if not node.children and cur_lvl < DFG_MAX_HIER_LEVEL:\n", + " if not node.children:\n", " return [node]\n", "\n", " collected_nodes = []\n", @@ -555,7 +554,7 @@ " i_inst_latest = bisect_right(cb.callback_instances, inst.timestamp, key=lambda x: x.timestamp)\n", "\n", " for inst_before in cb.callback_instances[i_inst_latest::-1]:\n", - " if lg.inst_runtime_interval(inst_before)[-1] < inst.timestamp:\n", + " if inst_before.t_end < inst.timestamp:\n", " return inst_before\n", "\n", " return None\n", @@ -606,11 +605,10 @@ " i_inst_after = bisect_right(cb.callback_instances, msg.timestamp, key=lambda x: x.timestamp)\n", "\n", " for inst in cb.callback_instances[:i_inst_after]:\n", - " inst_start, inst_end = lg.inst_runtime_interval(inst)\n", - " if msg.timestamp > inst_end:\n", + " if msg.timestamp > inst.t_end:\n", " continue\n", "\n", - " assert inst_start <= msg.timestamp <= inst_end\n", + " assert inst.t_start <= msg.timestamp <= inst.t_end\n", "\n", " cb_inst_candidates.append(inst)\n", "\n", @@ -674,7 +672,6 @@ "execution_count": null, "outputs": [], "source": [ - "\n", "%%skip_if_false E2E_ENABLED\n", "\n", "end_topics = [t for t in _tracing_context.topics if any(re.search(f, t.name) for f in E2E_OUTPUT_TOPIC_PATTERNS)]\n", @@ -710,12 +707,10 @@ "execution_count": null, "outputs": [], "source": [ - "\n", "%%skip_if_false E2E_ENABLED\n", "%%skip_if_false BW_ENABLED\n", "\n", "\n", - "\n", "def parse_bytes(string):\n", " match string[-1]:\n", " case 'K':\n", @@ -758,7 +753,7 @@ " msg_sizes[topic] = {'bw': parse_bytes(m.group(\"bw\")),\n", " 'min': parse_bytes(m.group(\"min\")),\n", " 'mean': parse_bytes(m.group(\"mean\")),\n", - " 'max': parse_bytes(m.group(\"max\"))}\n" + " 'max': parse_bytes(m.group(\"max\"))}" ], "metadata": { "collapsed": false @@ -828,23 +823,28 @@ "\n", "\n", "e2ess = []\n", + "e2ess_trees = []\n", "e2e_pathss = []\n", "for tree in trees:\n", " try:\n", " e2es, e2e_paths = zip(*relevant_e2es(tree, E2E_INPUT_TOPIC_PATTERNS))\n", " except ValueError as e:\n", - " print(e)\n", + " print(end='!')\n", " continue\n", " e2ess.append(e2es)\n", + " e2ess_trees.append(tree)\n", " e2e_pathss.append(e2e_paths)\n", "\n", + "print()\n", "print(f\"[ENTKÄFERN] IN-TOPICS FOUND in {len(e2ess)}/{len(trees)} trees\")\n", "in_topics_found = \"\\n \".join(map(lambda pair: f\"{pair[0]:.<110s} {pair[1]:>10d}\", in_topics_found.items()))\n", "print(f\" {in_topics_found}\")\n", "\n", "print(f\"[ENTKÄFERN] LEAF TOPICS FOUND\")\n", "leaf_topics_found = \"\\n \".join(map(lambda pair: f\"{pair[0]:.<110s} {pair[1]:>10d}\", leaf_topics_found.items()))\n", - "print(f\" {leaf_topics_found}\")" + "print(f\" {leaf_topics_found}\")\n", + "\n", + "trees = e2ess_trees # Filter non-E2E trees" ], "metadata": { "collapsed": false @@ -855,12 +855,11 @@ "execution_count": null, "outputs": [], "source": [ - "\n", "%%skip_if_false E2E_ENABLED\n", "%%skip_if_false E2E_PLOT\n", "\n", "\n", - "fig, ax = plt.subplots(figsize=(18, 9), num=\"e2e_plot\")\n", + "fig, ax = plt.subplots(num=\"e2e_plot\")\n", "DS = 1\n", "times = [tree.head.timestamp - trees[0].head.timestamp for tree in trees[::DS]]\n", "\n", @@ -905,7 +904,6 @@ "execution_count": null, "outputs": [], "source": [ - "\n", "%%skip_if_false E2E_ENABLED\n", "\n", "def critical_path(self):\n", @@ -971,7 +969,6 @@ "execution_count": null, "outputs": [], "source": [ - "\n", "%%skip_if_false E2E_ENABLED\n", "%%skip_if_false E2E_PLOT\n", "\n", @@ -981,7 +978,7 @@ " dictionary[key].append(value)\n", "\n", "\n", - "fig, (ax, ax_rel) = plt.subplots(2, 1, sharex=True, figsize=(60, 30), num=\"crit_plot\")\n", + "fig, (ax, ax_rel) = plt.subplots(2, 1, sharex=True, num=\"crit_plot\")\n", "ax.set_prop_cycle(cycler('color', [plt.cm.nipy_spectral(i / 4) for i in range(5)]))\n", "ax_rel.set_prop_cycle(cycler('color', [plt.cm.nipy_spectral(i / 4) for i in range(5)]))\n", "\n", @@ -1068,17 +1065,17 @@ "%%skip_if_false E2E_ENABLED\n", "%%skip_if_false E2E_PLOT\n", "\n", - "fig, ax = plt.subplots(figsize=(60, 15), num=\"crit_pdf\")\n", + "fig, ax = plt.subplots(num=\"crit_pdf\")\n", "ax.set_prop_cycle(cycler('color', [plt.cm.nipy_spectral(i / 4) for i in range(5)]))\n", "\n", "kde = stats.gaussian_kde(timestep_mags)\n", "xs = np.linspace(timestep_mags.min(), timestep_mags.max(), 1000)\n", "ax.plot(xs, kde(xs), label=\"End-to-End Latency\")\n", - "perc = 90\n", + "perc = 50\n", "ax.axvline(np.percentile(timestep_mags, perc), label=f\"{perc}th percentile\")\n", "\n", "ax2 = ax.twinx()\n", - "ax2.hist(timestep_mags, 200)\n", + "ax2.hist(timestep_mags, 50)\n", "ax2.set_ylim(0, ax2.get_ylim()[1])\n", "\n", "ax.set_title(\"Time Distribution for E2E Breakdown\")\n", @@ -1099,7 +1096,6 @@ "execution_count": null, "outputs": [], "source": [ - "\n", "%%skip_if_false E2E_ENABLED\n", "%%skip_if_false E2E_PLOT\n", "\n", @@ -1257,7 +1253,7 @@ " return height\n", "\n", "\n", - "fig, ax = plt.subplots(figsize=(36, 20), num=\"path_viz\")\n", + "fig, ax = plt.subplots(num=\"path_viz\")\n", "ax.set_ylim(0, len(e2es))\n", "\n", "y_labels = []\n", @@ -1299,32 +1295,31 @@ "execution_count": null, "outputs": [], "source": [ - "\n", "%%skip_if_false E2E_ENABLED\n", "\n", + "\n", + "def _owner(inst):\n", + " match inst:\n", + " case TrCallbackInstance(callback_obj=cb_obj):\n", + " cb_obj: TrCallbackObject\n", + " if cb_obj and cb_obj.callback_symbol:\n", + " sym = repr(sanitize(cb_obj.callback_symbol.symbol))\n", + " else:\n", + " sym = str(cb_obj.id)\n", + " return sym\n", + " case TrPublishInstance(publisher=pub):\n", + " pub: TrPublisher\n", + " topic = pub.topic_name\n", + " return topic\n", + " case _:\n", + " raise ValueError()\n", + "\n", + "\n", "critical_paths = {}\n", "print(len(relevant_trees))\n", "for tree in tqdm(relevant_trees):\n", " crit = critical_path(tree)\n", "\n", - "\n", - " def _owner(inst):\n", - " match inst:\n", - " case TrCallbackInstance(callback_obj=cb_obj):\n", - " cb_obj: TrCallbackObject\n", - " if cb_obj and cb_obj.callback_symbol:\n", - " sym = repr(sanitize(cb_obj.callback_symbol.symbol))\n", - " else:\n", - " sym = str(cb_obj.id)\n", - " return sym\n", - " case TrPublishInstance(publisher=pub):\n", - " pub: TrPublisher\n", - " topic = pub.topic_name\n", - " return topic\n", - " case _:\n", - " raise ValueError()\n", - "\n", - "\n", " key = tuple(map(_owner, crit[::-1]))\n", " if key not in critical_paths:\n", " critical_paths[key] = []\n", @@ -1387,6 +1382,24 @@ "name": "#%%%%skip_if_false E2E_ENABLED\n" } } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [], + "metadata": { + "collapsed": false + } } ], "metadata": { diff --git a/tracing_interop/tr_types.py b/tracing_interop/tr_types.py index 16c2fad..7a636b5 100644 --- a/tracing_interop/tr_types.py +++ b/tracing_interop/tr_types.py @@ -1,5 +1,6 @@ -from collections import namedtuple, UserList +from collections import namedtuple, UserList, defaultdict from dataclasses import dataclass, field +from functools import cached_property from typing import List, Dict, Optional, Set, TypeVar, Generic, Iterable from tracetools_analysis.processor.ros2 import Ros2Handler @@ -21,20 +22,26 @@ class Index(Generic[IdxItemType]): for idx_name, is_multi in idx_fields.items(): index = {} self.__indices[idx_name] = index - if is_multi: for item in self.__items: key = getattr(item, idx_name) if key not in index: index[key] = [] index[key].append(item) # Also sorted since items are processed in order and only filtered here - else: + else: # Unique index + duplicate_indices = defaultdict(lambda: 0) + for item in self.__items: key = getattr(item, idx_name) if key in index: - print(repr(ValueError(f"Duplicate key: {idx_name}={key}; old={index[key]}; new={item}"))) + duplicate_indices[key] += 1 index[key] = item + if duplicate_indices: + print(f"[ENTKÄFERN] Duplicate Indices in {idx_name}:") + # for key, count in duplicate_indices.items(): + # print(f"--{key:<20d}'s last candidate: {repr(index[key])}") + def __iter__(self): return iter(self.__items) @@ -331,6 +338,14 @@ class TrCallbackInstance: def callback_obj(self) -> Optional['TrCallbackObject']: return self._c.callback_objects.by_callback_object.get(self.callback_object) + @property + def t_start(self): + return self.timestamp + + @cached_property + def t_end(self): + return self.timestamp + self.duration + def __hash__(self): return hash((self.callback_object, self.timestamp, self.duration))