This commit is contained in:
Maximilian Schmeller 2022-09-15 18:04:45 +02:00
parent b456645621
commit 9848c39b61

View file

@ -7,6 +7,18 @@
"source": [
"import os\n",
"import sys\n",
"import re\n",
"import math\n",
"import graphviz as gv\n",
"from tqdm import tqdm\n",
"from bisect import bisect\n",
"from termcolor import colored\n",
"\n",
"import matplotlib.patches as mpatch\n",
"from scipy import stats\n",
"from cycler import cycler\n",
"\n",
"import glob\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
@ -14,7 +26,6 @@
"\n",
"from misc.utils import cached, parse_as\n",
"\n",
"%load_ext pyinstrument\n",
"%matplotlib inline"
],
"metadata": {
@ -50,14 +61,14 @@
"# Path to trace directory (e.g. ~/.ros/my-trace/ust) or to a converted trace file.\n",
"# Using the path \"/ust\" at the end is optional but greatly reduces processing time\n",
"# if kernel traces are also present.\n",
"TR_PATH = \"data/awsim-trace/ust\"\n",
"TR_PATH = \"/home/max/Projects/optimization_framework/artifacts/iteration0_worker1/aw_replay/tracing/scenario-trace/ust\"\n",
"\n",
"# Path to the folder all artifacts from this notebook are saved to.\n",
"# This entails plots as well as data tables.\n",
"OUT_PATH = \"out/\"\n",
"\n",
"# Whether to cache the results of long computations per set of inputs\n",
"CACHING_ENABLED = True\n",
"CACHING_ENABLED = False\n",
"\n",
"# Whether to annotate topics/publications with bandwidth/message size\n",
"BW_ENABLED = False\n",
@ -89,7 +100,7 @@
"DFG_INPUT_NODE_PATTERNS = [r\"^/sensing\"]\n",
"# RegEx pattern for nodes that shall be marked as system outputs\n",
"# These will be plotted with a double border\n",
"DFG_OUTPUT_NODE_PATTERNS = [r\"^/awapi\", r\"^/control/external_cmd_converter\"]\n",
"DFG_OUTPUT_NODE_PATTERNS = [r\"^/awapi\", r\"^/control/external_cmd_converter\", \"emergency\"]\n",
"# RegEx for nodes which shall not be plotted in the DFG\n",
"DFG_EXCL_NODE_PATTERNS = [r\"^/rviz2\", r\"transform_listener_impl\"]\n",
"\n",
@ -108,11 +119,10 @@
"\n",
"# All topics containing any of these RegEx patterns are considered output topics in E2E latency calculations\n",
"# E.g. r\"^/control/\" will cover all control topics\n",
"E2E_OUTPUT_TOPIC_PATTERNS = [r\"^/control/trajectory_follower/control_cmd\"]\n",
"E2E_OUTPUT_TOPIC_PATTERNS = [r\"^/control/\", r\"^/vehicle\"]\n",
"# All topics containing any of these RegEx patterns are considered input topics in E2E latency calculations\n",
"# E.g. r\"^/sensing/\" will cover all sensing topics\n",
"E2E_INPUT_TOPIC_PATTERNS = [\"/vehicle/status/\", \"/sensing/imu\"]\n",
"\n",
"E2E_INPUT_TOPIC_PATTERNS = [r\"^/vehicle/status/\", r\"^/sensing/imu\"]\n",
"\n",
"# This code overrides the above constants with environment variables, do not edit.\n",
"for env_key, env_value in os.environ.items():\n",
@ -123,10 +133,12 @@
" value = parse_as(type(globals()[key]), env_value)\n",
" globals()[key] = value\n",
"\n",
"\n",
"# Convert input paths to absolute paths\n",
"def _expand_path(path):\n",
" return os.path.realpath(os.path.expandvars(os.path.expanduser(path)))\n",
"\n",
"\n",
"TRACING_WS_BUILD_PATH = _expand_path(TRACING_WS_BUILD_PATH)\n",
"TR_PATH = _expand_path(TR_PATH)\n",
"OUT_PATH = _expand_path(OUT_PATH)\n",
@ -153,7 +165,9 @@
"from tracetools_analysis.loading import load_file\n",
"from tracetools_analysis.processor.ros2 import Ros2Handler\n",
"\n",
"from tracing_interop.tr_types import TrTimer, TrTopic, TrPublisher, TrPublishInstance, TrCallbackInstance, TrContext"
"from tracing_interop.tr_types import *\n",
"from latency_graph.message_tree import DepTree\n",
"from matching.subscriptions import sanitize"
],
"metadata": {
"collapsed": false,
@ -185,7 +199,7 @@
" return TrContext(handler)\n",
"\n",
"\n",
"_tracing_context = cached(\"tr_objects\", _load_traces, [TR_PATH], CACHING_ENABLED)\n",
"_tracing_context = cached(\"tr_objects\", _load_traces, [TR_PATH], not CACHING_ENABLED)\n",
"_tr_globals = [\"nodes\", \"publishers\", \"subscriptions\", \"timers\", \"timer_node_links\", \"subscription_objects\",\n",
" \"callback_objects\", \"callback_symbols\", \"publish_instances\", \"callback_instances\", \"topics\"]\n",
"\n",
@ -223,10 +237,12 @@
"source": [
"from latency_graph import latency_graph as lg\n",
"\n",
"\n",
"def _make_latency_graph():\n",
" return lg.LatencyGraph(_tracing_context)\n",
"\n",
"lat_graph = cached(\"lat_graph\", _make_latency_graph, [TR_PATH], CACHING_ENABLED)"
"\n",
"lat_graph = cached(\"lat_graph\", _make_latency_graph, [TR_PATH], not CACHING_ENABLED)"
],
"metadata": {
"collapsed": false,
@ -243,8 +259,6 @@
"%%skip_if_false DFG_ENABLED\n",
"%%skip_if_false DFG_PLOT\n",
"\n",
"from tracing_interop.tr_types import TrNode, TrCallbackObject, TrSubscriptionObject\n",
"\n",
"#################################################\n",
"# Plot DFG\n",
"#################################################\n",
@ -276,13 +290,13 @@
" 'pointcloud_container': 'sensing',\n",
"}\n",
"\n",
"import graphviz as gv\n",
"\n",
"g = gv.Digraph('G', filename=\"latency_graph.gv\",\n",
" node_attr={'shape': 'plain'},\n",
" graph_attr={'pack': '1'})\n",
"g.graph_attr['rankdir'] = 'LR'\n",
"\n",
"\n",
"def plot_hierarchy(gv_parent, lg_node: lg.LGHierarchyLevel, **subgraph_kwargs):\n",
" if lg_node.name == \"[NONE]\":\n",
" return\n",
@ -311,6 +325,7 @@
" for ch in lg_node.children:\n",
" plot_hierarchy(c, ch, **subgraph_kwargs)\n",
"\n",
"\n",
"def plot_lg(graph: lg.LatencyGraph):\n",
" for top_level_node in graph.top_node.children:\n",
" colors = node_colors[node_namespace_mapping.get(top_level_node.name)]\n",
@ -319,6 +334,7 @@
" for edge in graph.edges:\n",
" g.edge(f\"{edge.start.id()}:out\", f\"{edge.end.id()}:in\")\n",
"\n",
"\n",
"plot_lg(lat_graph)\n",
"\n",
"g.save(\"latency_graph.gv\")\n",
@ -341,8 +357,6 @@
"%%skip_if_false DFG_ENABLED\n",
"%%skip_if_false DFG_PLOT\n",
"\n",
"import re\n",
"import math\n",
"\n",
"##################################################\n",
"# Compute in/out topics for hierarchy level X\n",
@ -363,6 +377,7 @@
"\n",
" return _traverse_node(lat_graph.top_node)\n",
"\n",
"\n",
"lvl_nodes = get_nodes_on_level(lat_graph)\n",
"lvl_nodes = [n for n in lvl_nodes if not any(re.search(p, n.full_name) for p in DFG_EXCL_NODE_PATTERNS)]\n",
"\n",
@ -373,6 +388,7 @@
"print(', '.join(map(lambda n: n, output_nodes)))\n",
"print(', '.join(map(lambda n: n.full_name, lvl_nodes)))\n",
"\n",
"\n",
"def _collect_callbacks(n: lg.LGHierarchyLevel):\n",
" callbacks = []\n",
" callbacks += n.callbacks\n",
@ -387,7 +403,6 @@
" for cb in cbs:\n",
" cb_to_node_map[cb.id()] = n\n",
"\n",
"\n",
"edges_between_nodes = {}\n",
"for edge in lat_graph.edges:\n",
" from_node = cb_to_node_map.get(edge.start.id())\n",
@ -422,6 +437,7 @@
" g.node(helper_node_name, label=\"\", shape=\"none\", height=\"0\", width=\"0\")\n",
" g.edge(helper_node_name, n.full_name)\n",
"\n",
"\n",
"def compute_e2e_paths(start_nodes, end_nodes, edges):\n",
" frontier_paths = [[n] for n in start_nodes]\n",
" final_paths = []\n",
@ -446,6 +462,7 @@
" for path in final_paths]\n",
" return final_paths\n",
"\n",
"\n",
"e2e_paths = compute_e2e_paths(input_nodes, output_nodes, edges_between_nodes)\n",
"\n",
"for (src_name, dst_name), edges in edges_between_nodes.items():\n",
@ -472,9 +489,6 @@
"source": [
"%%skip_if_false E2E_ENABLED\n",
"\n",
"from tqdm import tqdm\n",
"from bisect import bisect\n",
"\n",
"\n",
"def inst_get_dep_msg(inst: TrCallbackInstance):\n",
" if inst.callback_object not in _tracing_context.callback_objects.by_callback_object:\n",
@ -512,6 +526,7 @@
" # print(f\"No messages found for topic {sub_obj.subscription.topic}\")\n",
" return None\n",
"\n",
"\n",
"def inst_get_dep_insts(inst: TrCallbackInstance):\n",
" if inst.callback_object not in _tracing_context.callback_objects.by_callback_object:\n",
" # print(\"Callback not found\")\n",
@ -531,6 +546,7 @@
" insts = [inst for inst in insts if inst is not None]\n",
" return insts\n",
"\n",
"\n",
"def get_cb_dep_cbs(cb: TrCallbackObject):\n",
" match cb.owner:\n",
" case TrSubscriptionObject() as sub_obj:\n",
@ -591,7 +607,8 @@
" return dep_inst\n",
"\n",
"\n",
"def get_dep_tree(inst: TrPublishInstance | TrCallbackInstance, lvl=0, visited_topics=None, is_dep_cb=False, start_time=None):\n",
"def get_dep_tree(inst: TrPublishInstance | TrCallbackInstance, lvl=0, visited_topics=None, is_dep_cb=False,\n",
" start_time=None):\n",
" if visited_topics is None:\n",
" visited_topics = set()\n",
"\n",
@ -616,7 +633,8 @@
" deps += inst_get_dep_insts(cb_inst)\n",
" children_are_dep_cbs = True\n",
" case _:\n",
" print(f\"[WARN] Expected inst to be of type TrPublishInstance or TrCallbackInstance, got {type(inst).__name__}\")\n",
" print(\n",
" f\"[WARN] Expected inst to be of type TrPublishInstance or TrCallbackInstance, got {type(inst).__name__}\")\n",
" return None\n",
" #print(\"Rec level\", lvl)\n",
" deps = [dep for dep in deps if dep is not None]\n",
@ -640,21 +658,26 @@
"\n",
"end_topics = [t for t in _tracing_context.topics if any(re.search(f, t.name) for f in E2E_OUTPUT_TOPIC_PATTERNS)]\n",
"\n",
"\n",
"def build_dep_trees():\n",
" all_trees = []\n",
" for end_topic in end_topics:\n",
" end_topic: TrTopic\n",
" print(f\"====={end_topic.name}\")\n",
"\n",
" pubs = end_topic.publishers\n",
" print(len(pubs))\n",
" for pub in pubs:\n",
" msgs = pub.instances\n",
" print(len(msgs))\n",
" for msg in tqdm(msgs, desc=f\"Building message chains for topic {end_topic.name}\"):\n",
" msg: TrPublishInstance\n",
" tree = get_dep_tree(msg)\n",
" all_trees.append(tree)\n",
" return all_trees\n",
"\n",
"trees = cached(\"trees\", build_dep_trees, [TR_PATH], CACHING_ENABLED)"
"\n",
"trees = cached(\"trees\", build_dep_trees, [TR_PATH], not CACHING_ENABLED)"
],
"metadata": {
"collapsed": false,
@ -671,7 +694,6 @@
"%%skip_if_false E2E_ENABLED\n",
"%%skip_if_false BW_ENABLED\n",
"\n",
"import glob\n",
"\n",
"\n",
"def parse_bytes(string):\n",
@ -706,7 +728,8 @@
" #print(f\"No data for {topic}\")\n",
" continue\n",
"\n",
" line_pattern = re.compile(r\"(?P<bw>[0-9.]+ [KM]?)B/s from (?P<n_msgs>[0-9.]+) messages --- Message size mean: (?P<mean>[0-9.]+ [KM]?)B min: (?P<min>[0-9.]+ [KM]?)B max: (?P<max>[0-9.]+ [KM]?)B\\n\")\n",
" line_pattern = re.compile(\n",
" r\"(?P<bw>[0-9.]+ [KM]?)B/s from (?P<n_msgs>[0-9.]+) messages --- Message size mean: (?P<mean>[0-9.]+ [KM]?)B min: (?P<min>[0-9.]+ [KM]?)B max: (?P<max>[0-9.]+ [KM]?)B\\n\")\n",
" m = re.fullmatch(line_pattern, lines[-1])\n",
" if m is None:\n",
" print(f\"Line could not be parsed in {topic}: '{lines[-1]}'\")\n",
@ -731,9 +754,6 @@
"source": [
"%%skip_if_false E2E_ENABLED\n",
"\n",
"\n",
"from latency_graph.message_tree import DepTree\n",
"\n",
"def leaf_topics(tree: DepTree, lvl=0):\n",
" ret_list = []\n",
" match tree.head:\n",
@ -873,16 +893,19 @@
"ax.plot(times, [np.mean(e2es) for e2es in e2ess[::DS]])\n",
"ax.fill_between(times, [np.min(e2es) for e2es in e2ess[::DS]], [np.max(e2es) for e2es in e2ess[::DS]], alpha=.3)\n",
"\n",
"\n",
"def scatter_topic(topic_name, y=0, **scatter_kwargs):\n",
" for pub in topics.by_name[topic_name].publishers:\n",
" if not pub:\n",
" continue\n",
"\n",
" inst_timestamps = [inst.timestamp - trees[0].head.timestamp for inst in pub.instances if inst.timestamp >= trees[0].head.timestamp]\n",
" inst_timestamps = [inst.timestamp - trees[0].head.timestamp for inst in pub.instances if\n",
" inst.timestamp >= trees[0].head.timestamp]\n",
" scatter_kwargs_default = {\"marker\": \"x\", \"color\": \"indianred\"}\n",
" scatter_kwargs_default.update(scatter_kwargs)\n",
" ax.scatter(inst_timestamps, np.full(len(inst_timestamps), fill_value=y), **scatter_kwargs_default)\n",
"\n",
"\n",
"scatter_topic(\"/autoware/engage\")\n",
"scatter_topic(\"/planning/scenario_planning/parking/trajectory\", y=-.04, color=\"cadetblue\")\n",
"scatter_topic(\"/planning/scenario_planning/lane_driving/trajectory\", y=-.08, color=\"darkgreen\")\n",
@ -944,6 +967,7 @@
" sort_subtree(dep, sort_func)\n",
" return subtree\n",
"\n",
"\n",
"def _leaf_filter(inst, root):\n",
" if root.timestamp - inst.timestamp > E2E_TIME_LIMIT_S:\n",
" return False\n",
@ -972,7 +996,8 @@
"%%skip_if_false E2E_ENABLED\n",
"%%skip_if_false E2E_PLOT\n",
"\n",
"from cycler import cycler\n",
"\n",
"\n",
"\n",
"def dict_safe_append(dictionary, key, value):\n",
" if key not in dictionary:\n",
@ -1031,7 +1056,9 @@
"\n",
"time_breakdown = {k: np.array(v) for k, v in time_breakdown.items()}\n",
"\n",
"timer_cb_times = [sum(inst.duration for inst in path if isinstance(inst, TrCallbackInstance) and isinstance(inst.callback_obj, TrTimer)) for path in critical_paths]\n",
"timer_cb_times = [sum(inst.duration for inst in path if\n",
" isinstance(inst, TrCallbackInstance) and isinstance(inst.callback_obj, TrTimer)) for path in\n",
" critical_paths]\n",
"sub_cb_times = [sum(inst.duration for inst in path if isinstance(inst, TrCallbackInstance)) for path in critical_paths]\n",
"\n",
"labels, values = list(zip(*time_breakdown.items()))\n",
@ -1066,8 +1093,6 @@
"%%skip_if_false E2E_ENABLED\n",
"%%skip_if_false E2E_PLOT\n",
"\n",
"from scipy import stats\n",
"\n",
"fig, ax = plt.subplots(figsize=(60, 15), num=\"crit_pdf\")\n",
"ax.set_prop_cycle(cycler('color', [plt.cm.nipy_spectral(i / 4) for i in range(5)]))\n",
"\n",
@ -1105,10 +1130,6 @@
"%%skip_if_false E2E_ENABLED\n",
"%%skip_if_false E2E_PLOT\n",
"\n",
"from tracing_interop.tr_types import TrSubscription\n",
"import matplotlib.patches as mpatch\n",
"import math\n",
"\n",
"tree = trees[E2E_PLOT_TIMESTAMP]\n",
"e2es = e2ess[E2E_PLOT_TIMESTAMP]\n",
"e2e_paths = e2e_pathss[E2E_PLOT_TIMESTAMP]\n",
@ -1116,6 +1137,7 @@
"margin_x = 0\n",
"arr_width = 1 - margin_y\n",
"\n",
"\n",
"def cb_str(inst: TrCallbackInstance):\n",
" cb: TrCallbackObject = inst.callback_obj\n",
" if not cb:\n",
@ -1138,9 +1160,11 @@
"def ts_str(inst, prev):\n",
" return f\"{(inst.timestamp - prev) * 1e3:+09.3f}ms\"\n",
"\n",
"\n",
"def bw_str(bw):\n",
" return bytes_str(bw['mean'])\n",
"\n",
"\n",
"def trunc_chars(string, w, e2e):\n",
" if w < e2e * .005:\n",
" return \"\"\n",
@ -1150,6 +1174,7 @@
"\n",
" return \"...\" + string[-n_chars:] if n_chars < len(string) else string\n",
"\n",
"\n",
"#e2e_paths = sorted(e2e_paths, key=lambda path: path[-1].timestamp - path[0].timestamp, reverse=True)\n",
"#for y, e2e_path in reversed(list(enumerate(e2e_paths))):\n",
"# last_pub_ts = None\n",
@ -1165,6 +1190,7 @@
"\n",
"legend_entries = {}\n",
"\n",
"\n",
"def plot_subtree(subtree: DepTree, ax: plt.Axes, y_labels, y=0, next_cb_start=0):\n",
" height = fanout(subtree)\n",
" inst = subtree.head\n",
@ -1179,13 +1205,16 @@
" r_h = height - margin_y\n",
"\n",
" r = mpatch.Rectangle((r_x, r_y), r_w, r_h,\n",
" ec=\"cadetblue\" if is_sub else \"indianred\", fc=\"lightblue\" if is_sub else \"lightcoral\", zorder=9000)\n",
" ec=\"cadetblue\" if is_sub else \"indianred\", fc=\"lightblue\" if is_sub else \"lightcoral\",\n",
" zorder=9000)\n",
" ax.add_artist(r)\n",
"\n",
" text = repr(sanitize(inst.callback_obj.callback_symbol.symbol)) if inst.callback_obj and inst.callback_obj.callback_symbol else \"??\"\n",
" text = repr(sanitize(\n",
" inst.callback_obj.callback_symbol.symbol)) if inst.callback_obj and inst.callback_obj.callback_symbol else \"??\"\n",
" text = trunc_chars(text, r_w, t_e2e)\n",
" if text:\n",
" ax.text(r_x + r_w / 2, r_y + r_h / 2, text, ha=\"center\", va=\"center\", backgroundcolor=(1,1,1,.5), zorder=11000)\n",
" ax.text(r_x + r_w / 2, r_y + r_h / 2, text, ha=\"center\", va=\"center\", backgroundcolor=(1, 1, 1, .5),\n",
" zorder=11000)\n",
"\n",
" if is_sub and \"Subscription CB\" not in legend_entries:\n",
" legend_entries[\"Subscription CB\"] = r\n",
@ -1208,7 +1237,8 @@
"\n",
" text = trunc_chars(text, r_w, t_e2e)\n",
" if text:\n",
" ax.text(r_x + r_w / 2, r_y + r_h / 2, text, ha=\"center\", va=\"center\", backgroundcolor=(1,1,1,.5), zorder=11000)\n",
" ax.text(r_x + r_w / 2, r_y + r_h / 2, text, ha=\"center\", va=\"center\", backgroundcolor=(1, 1, 1, .5),\n",
" zorder=11000)\n",
"\n",
" if \"Idle\" not in legend_entries:\n",
" legend_entries[\"Idle\"] = r\n",
@ -1233,7 +1263,8 @@
" text = pub.topic_name\n",
" text = trunc_chars(text, r_w, t_e2e)\n",
" if text:\n",
" ax.text(r_x + r_w / 2, r_y + arr_width / 2, text, ha=\"center\", va=\"center\", backgroundcolor=(1,1,1,.5), zorder=11000)\n",
" ax.text(r_x + r_w / 2, r_y + arr_width / 2, text, ha=\"center\", va=\"center\",\n",
" backgroundcolor=(1, 1, 1, .5), zorder=11000)\n",
"\n",
" if \"DDS\" not in legend_entries:\n",
" legend_entries[\"DDS\"] = r\n",
@ -1241,7 +1272,8 @@
" topic_stats = msg_sizes.get(pub.topic_name) if BW_ENABLED else None\n",
" if topic_stats:\n",
" size_str = bw_str(topic_stats)\n",
" ax.text(r_x + r_w / 2, r_y + arr_width + margin_y, size_str, ha=\"center\", backgroundcolor=(1,1,1,.5), zorder=11000)\n",
" ax.text(r_x + r_w / 2, r_y + arr_width + margin_y, size_str, ha=\"center\",\n",
" backgroundcolor=(1, 1, 1, .5), zorder=11000)\n",
" else:\n",
" print(\"[WARN] Tried to publish to another PublishInstance\")\n",
" next_cb_start = None\n",
@ -1252,7 +1284,6 @@
" return height\n",
"\n",
"\n",
"\n",
"fig, ax = plt.subplots(figsize=(36, 20), num=\"path_viz\")\n",
"ax.set_ylim(0, len(e2es))\n",
"\n",
@ -1300,14 +1331,12 @@
"source": [
"%%skip_if_false E2E_ENABLED\n",
"\n",
"from termcolor import colored\n",
"from tqdm import tqdm\n",
"\n",
"critical_paths = {}\n",
"print(len(relevant_trees))\n",
"for tree in tqdm(trees):\n",
" crit = critical_path(tree)\n",
"\n",
"\n",
" def _owner(inst):\n",
" match inst:\n",
" case TrCallbackInstance(callback_obj=cb_obj):\n",
@ -1324,6 +1353,7 @@
" case _:\n",
" raise ValueError()\n",
"\n",
"\n",
" key = tuple(map(_owner, crit[::-1]))\n",
" if key not in critical_paths:\n",
" critical_paths[key] = []\n",
@ -1336,7 +1366,8 @@
"for key, paths in items:\n",
" path_records = [(\" -> \".join(key), p[0].timestamp, p[0].timestamp - p[-1].timestamp) for p in paths]\n",
" out_df.append(path_records)\n",
" print(f\"======== {len(paths)}x: {sum(map(lambda p: p[0].timestamp - p[-1].timestamp, paths))/len(paths)*1000:.3f}ms\")\n",
" print(\n",
" f\"======== {len(paths)}x: {sum(map(lambda p: p[0].timestamp - p[-1].timestamp, paths)) / len(paths) * 1000:.3f}ms\")\n",
" paths_durations = []\n",
" for path in paths:\n",
" next_inst = None\n",
@ -1371,7 +1402,7 @@
" dur_str = colored(f\"{duration * 1000 :>.3f}ms\", colors[E2E_PLOT_TIMESTAMP])\n",
" print(f\" -> {dur_str} {part}\")\n",
"\n",
"out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\")"
"out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\", index=False)"
],
"metadata": {
"collapsed": false,
@ -1383,100 +1414,14 @@
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [],
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"%%skip_if_false E2E_ENABLED\n",
"\n",
"from termcolor import colored\n",
"from tqdm import tqdm\n",
"from matching.subscriptions import sanitize\n",
"\n",
"critical_paths = {}\n",
"print(len(relevant_trees))\n",
"for tree in tqdm(trees):\n",
" crit = critical_path(tree)\n",
"\n",
" def _owner(inst):\n",
" match inst:\n",
" case TrCallbackInstance(callback_obj=cb_obj):\n",
" cb_obj: TrCallbackObject\n",
" if cb_obj and cb_obj.callback_symbol:\n",
" sym = repr(sanitize(cb_obj.callback_symbol.symbol))\n",
" else:\n",
" sym = str(cb_obj.id)\n",
" return sym\n",
" case TrPublishInstance(publisher=pub):\n",
" pub: TrPublisher\n",
" topic = pub.topic_name\n",
" return topic\n",
" case _:\n",
" raise ValueError()\n",
"\n",
" key = tuple(map(_owner, crit[::-1]))\n",
" if key not in critical_paths:\n",
" critical_paths[key] = []\n",
" critical_paths[key].append(crit)\n",
"\n",
"items = list(critical_paths.items())\n",
"items.sort(key=lambda pair: len(pair[1]), reverse=True)\n",
"\n",
"out_df = pd.DataFrame(columns=[\"path\", \"timestamp\", \"e2e_latency\"])\n",
"for key, paths in items:\n",
" path_records = [(\" -> \".join(key), p[0].timestamp, p[0].timestamp - p[-1].timestamp) for p in paths]\n",
" out_df.append(path_records)\n",
" print(f\"======== {len(paths)}x: {sum(map(lambda p: p[0].timestamp - p[-1].timestamp, paths))/len(paths)*1000:.3f}ms\")\n",
" paths_durations = []\n",
" for path in paths:\n",
" next_inst = None\n",
" durations = []\n",
" for inst in path:\n",
" match inst:\n",
" case TrCallbackInstance(timestamp=t, duration=d):\n",
" if not next_inst:\n",
" durations.append(d)\n",
" else:\n",
" durations.append(min(d, next_inst.timestamp - t))\n",
" case TrPublishInstance(timestamp=t):\n",
" if not next_inst:\n",
" durations.append(0.0)\n",
" else:\n",
" durations.append(next_inst.timestamp - t)\n",
" case _:\n",
" raise ValueError()\n",
" next_inst = inst\n",
" paths_durations.append(durations)\n",
"\n",
" durations = list(map(lambda l: sum(l) / len(l), zip(*paths_durations)))[::-1]\n",
" assert len(durations) == len(key)\n",
" perc = np.percentile(durations, [70, 90, 95, 100])\n",
" colors = [\"green\", \"yellow\", \"red\", \"magenta\"]\n",
" for part, duration in zip(key, durations):\n",
" E2E_PLOT_TIMESTAMP = 0\n",
" for j, p in enumerate(perc):\n",
" if duration < p:\n",
" break\n",
" E2E_PLOT_TIMESTAMP = j\n",
" dur_str = colored(f\"{duration * 1000 :>.3f}ms\", colors[E2E_PLOT_TIMESTAMP])\n",
" print(f\" -> {dur_str} {part}\")\n",
"\n",
"out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": []
}
],
"metadata": {