diff --git a/message_tree/message_tree_algorithms.py b/message_tree/message_tree_algorithms.py index 6be5df6..33c73f3 100644 --- a/message_tree/message_tree_algorithms.py +++ b/message_tree/message_tree_algorithms.py @@ -8,7 +8,7 @@ from tqdm import tqdm from latency_graph.latency_graph_structure import LatencyGraph from matching.subscriptions import sanitize -from message_tree.message_tree_structure import DepTree, E2EBreakdownItem +from message_tree.message_tree_structure import DepTree, E2EBreakdownItem, depth, size from tracing_interop.tr_types import TrCallbackInstance, TrPublishInstance, TrPublisher, TrCallbackObject, TrContext, \ TrSubscriptionObject, TrTimer, TrNode, TrTopic @@ -104,7 +104,7 @@ def build_dep_trees(end_topics, lat_graph, tr, excluded_path_patterns, time_limi pubs = end_topic.publishers for pub in pubs: msgs = list(pub.instances) - for msg in tqdm(msgs, desc="Processing output messages"): + for msg in tqdm(msgs, mininterval=5.0, desc="Processing output messages"): msg: TrPublishInstance tree = get_dep_tree(msg, lat_graph, tr, excluded_path_patterns, time_limit_s) all_trees.append(tree) @@ -233,8 +233,17 @@ def e2e_paths_sorted_desc(tree: DepTree, input_topic_patterns): `input_topic_patterns`. The paths are sorted by length in a descending manner (element 0 is longest). """ - def _collect_all_paths(t: DepTree): - return [(*_collect_all_paths(d), t.head) for d in t.deps] + # TODO: Make this a Dijkstra/similar implementation instead. This is so slow it's funny. + def _collect_all_paths(t: DepTree, lvl=0): + """ + Returns a flat list of all paths that lead from t's root to a leaf. + """ + if lvl > 30 or not t.deps: + return [(t.head, )] + + # List of flat lists of paths + deps_paths = [_collect_all_paths(d, lvl + 1) for d in t.deps] + return [(*path, t.head) for dep_paths in deps_paths for path in dep_paths] def _trim_path(path): valid_input = False @@ -250,14 +259,17 @@ def e2e_paths_sorted_desc(tree: DepTree, input_topic_patterns): if not valid_input: return None - if i == 0 or not isinstance(path[i - 1], TrCallbackInstance): - print(f"[WARN] Message has no publishing callback in dep tree.") - return path[i:] # Return path from first message that fits an input topic pattern + if i == 0: + #print(end=".") + return None + if not isinstance(path[i - 1], TrCallbackInstance): + #print(end="#") + return None return path[i - 1:] # Return path from its publishing callback if it exists paths = _collect_all_paths(tree) - paths = list(filter(lambda p: p is not None, map(_trim_path, tqdm(paths, desc="_trim_path")))) + paths = list(filter(None, map(_trim_path, paths))) paths.sort(key=lambda path: path[-1].timestamp - path[0].timestamp, reverse=True) return paths @@ -328,7 +340,7 @@ def _repr_path(path: List[TrPublishInstance | TrCallbackInstance]): def aggregate_e2e_paths(paths: List[List[TrPublishInstance | TrCallbackInstance]]): path_cohorts = defaultdict(list) - for path in paths: + for path in tqdm(paths, mininterval=5.0, desc="Aggregating E2E path cohorts"): key = _repr_path(path) path_cohorts[key].append(path) diff --git a/message_tree/message_tree_plots.py b/message_tree/message_tree_plots.py index c4f8ece..b9d6ed0 100644 --- a/message_tree/message_tree_plots.py +++ b/message_tree/message_tree_plots.py @@ -40,7 +40,7 @@ def e2e_breakdown_inst_stack(*paths: List[E2EBreakdownItem]): fig.suptitle("Detailed E2E Latency Path Breakdown") bottom = 0 - for i in range(len(paths)): + for i in range(len(paths[0])): e2e_items = [path[i] for path in paths] durations = np.array([item.duration for item in e2e_items]) ax.bar(range(len(paths)), durations, bottom=bottom) diff --git a/message_tree/message_tree_structure.py b/message_tree/message_tree_structure.py index 7356d4d..6c32211 100644 --- a/message_tree/message_tree_structure.py +++ b/message_tree/message_tree_structure.py @@ -5,12 +5,16 @@ E2EBreakdownItem = namedtuple("E2EBreakdownItem", ("type", "duration", "location DepTree = namedtuple("DepTree", ("head", "deps")) -def depth(tree: DepTree): - return 1 + max(map(depth, tree.deps), default=0) +def depth(tree: DepTree, lvl=0): + if lvl > 1000: + return 0 + return 1 + max(map(lambda d: depth(d, lvl + 1), tree.deps), default=0) -def size(tree: DepTree): - return 1 + sum(map(size, tree.deps)) +def size(tree: DepTree, lvl=0): + if lvl > 1000: + return 0 + return 1 + sum(map(lambda d: size(d, lvl + 1), tree.deps)) def fanout(tree: DepTree): diff --git a/trace-analysis.ipynb b/trace-analysis.ipynb index 7c2f59a..e05b69f 100644 --- a/trace-analysis.ipynb +++ b/trace-analysis.ipynb @@ -25,6 +25,7 @@ "\n", "from misc.utils import cached, parse_as\n", "\n", + "# TODO: This is useless for some reason (goal is to make graphs in notebook large and hi-res)\n", "plt.rcParams['figure.dpi'] = 300\n", "\n", "%matplotlib inline" @@ -134,10 +135,10 @@ "\n", "# E2E paths are uniquely identified by a string like \"/topic/1 -> void(Node1)(args1) -> /topic/2 -> void(Node2)(args2) -> void(Node2)(args3) -> ...\".\n", "# Certain patterns only occur in initial setup or in scenario switching and can be excluded via RegEx patterns here.\n", - "E2E_EXCLUDED_PATH_PATTERNS = [r\"NDTScanMatcher\"]\n", + "E2E_EXCLUDED_PATH_PATTERNS = [r\"NDTScanMatcher\", r\"^/parameter_events\"]\n", "\n", "\n", - "DDD = False\n", + "DEBUG = False\n", "\n", "# This code overrides the above constants with environment variables, do not edit.\n", "for env_key, env_value in os.environ.items():\n", @@ -443,7 +444,7 @@ "source": [ "%%skip_if_false E2E_ENABLED\n", "%%skip_if_false E2E_PLOT\n", - "%%skip_if_false DDD\n", + "%%skip_if_false DEBUG\n", "\n", "\n", "fig, ax = plt.subplots(num=\"e2e_plot\")\n", @@ -486,33 +487,53 @@ "collapsed": false } }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "%%skip_if_false DEBUG\n", + "\n", + "import pickle\n", + "with open(\"trees.pkl\", \"rb\") as f:\n", + " trees = pickle.load(f)" + ], + "metadata": { + "collapsed": false + } + }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "%%skip_if_false E2E_ENABLED\n", - "%%skip_if_false E2E_PLOT\n", - "\n", - "%load_ext line_profiler\n", - "\n", "\n", "from message_tree.message_tree_algorithms import e2e_paths_sorted_desc\n", "from message_tree.message_tree_plots import e2e_breakdown_type_hist\n", "\n", - "def _map_tree_to_e2es(tree):\n", - " return e2e_paths_sorted_desc(tree, E2E_INPUT_TOPIC_PATTERNS)\n", + "trees_paths = [e2e_paths_sorted_desc(tree, E2E_INPUT_TOPIC_PATTERNS) for tree in tqdm(trees, mininterval=10.0,\n", + " desc=\"Extracting E2E paths\")]\n", + "all_paths = [p for paths in trees_paths for p in paths]\n", + "#all_e2e_items = [i for p in all_paths for i in p]" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "%%skip_if_false E2E_ENABLED\n", + "%%skip_if_false E2E_PLOT\n", "\n", - "def ppp():\n", - " trees_paths = [e2e_paths_sorted_desc(tree, E2E_INPUT_TOPIC_PATTERNS) for tree in tqdm(trees, desc=\"Extracting E2E paths\")]\n", - " all_paths = [p for paths in trees_paths for p in paths]\n", - " all_e2e_items = [i for p in all_paths for i in p]\n", + "from message_tree.message_tree_algorithms import e2e_latency_breakdown\n", "\n", - " return all_e2e_items\n", - "\n", - "%lprun -f e2e_paths_sorted_desc ppp()\n", - "\n", - "e2e_breakdown_type_hist(ppp())\n", + "conv_items = [i for p in tqdm(all_paths, mininterval=5.0, desc=\"Calculating E2E latency breakdowns\")\n", + " for i in e2e_latency_breakdown(p)]\n", + "e2e_breakdown_type_hist(conv_items)\n", "\n", "None\n" ], @@ -525,10 +546,60 @@ "execution_count": null, "outputs": [], "source": [ + "%%skip_if_false E2E_ENABLED\n", + "\n", + "from message_tree.message_tree_algorithms import aggregate_e2e_paths\n", + "\n", + "cohorts = aggregate_e2e_paths(all_paths)\n", + "cohort_pairs = [(k, v) for k, v in cohorts.items()]\n", + "cohort_pairs.sort(key=lambda kv: len(kv[1]), reverse=True)\n", + "\n", + "path_records = [{\"path\": path_key,\n", + " \"timestamp\": path[-1].timestamp,\n", + " \"e2e_latency\": path[-1].timestamp - path[0].timestamp} \\\n", + " for path_key, paths in cohort_pairs for path in paths if path]\n", + "\n", + "out_df = pd.DataFrame.from_records(path_records)\n", + "out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\", index=False)\n", + "\n", + "mode_cohort_key, mode_cohort = cohort_pairs[0]" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "%%skip_if_false E2E_ENABLED\n", + "%%skip_if_false E2E_PLOT\n", + "\n", + "from message_tree.message_tree_plots import e2e_breakdown_inst_stack\n", + "\n", + "\n", + "mode_cohort_breakdown = [e2e_latency_breakdown(p) for p in mode_cohort[:200]]\n", + "print(len(mode_cohort))\n", + "print(mode_cohort_key.replace(\" -> \", \"\\n -> \"))\n", + "e2e_breakdown_inst_stack(*mode_cohort_breakdown)" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "%%skip_if_false E2E_ENABLED\n", + "%%skip_if_false DEBUG\n", + "\n", "import pickle\n", "\n", "with open(\"trees.pkl\", \"wb\") as f:\n", - " pickle.dump(trees, f)\n" + " pickle.dump(trees, f)" ], "metadata": { "collapsed": false