Added CSV output back in, fixed bugs in refactoring
This commit is contained in:
parent
b1dc01b101
commit
65c21fb6ce
4 changed files with 120 additions and 33 deletions
|
@ -8,7 +8,7 @@ from tqdm import tqdm
|
||||||
|
|
||||||
from latency_graph.latency_graph_structure import LatencyGraph
|
from latency_graph.latency_graph_structure import LatencyGraph
|
||||||
from matching.subscriptions import sanitize
|
from matching.subscriptions import sanitize
|
||||||
from message_tree.message_tree_structure import DepTree, E2EBreakdownItem
|
from message_tree.message_tree_structure import DepTree, E2EBreakdownItem, depth, size
|
||||||
from tracing_interop.tr_types import TrCallbackInstance, TrPublishInstance, TrPublisher, TrCallbackObject, TrContext, \
|
from tracing_interop.tr_types import TrCallbackInstance, TrPublishInstance, TrPublisher, TrCallbackObject, TrContext, \
|
||||||
TrSubscriptionObject, TrTimer, TrNode, TrTopic
|
TrSubscriptionObject, TrTimer, TrNode, TrTopic
|
||||||
|
|
||||||
|
@ -104,7 +104,7 @@ def build_dep_trees(end_topics, lat_graph, tr, excluded_path_patterns, time_limi
|
||||||
pubs = end_topic.publishers
|
pubs = end_topic.publishers
|
||||||
for pub in pubs:
|
for pub in pubs:
|
||||||
msgs = list(pub.instances)
|
msgs = list(pub.instances)
|
||||||
for msg in tqdm(msgs, desc="Processing output messages"):
|
for msg in tqdm(msgs, mininterval=5.0, desc="Processing output messages"):
|
||||||
msg: TrPublishInstance
|
msg: TrPublishInstance
|
||||||
tree = get_dep_tree(msg, lat_graph, tr, excluded_path_patterns, time_limit_s)
|
tree = get_dep_tree(msg, lat_graph, tr, excluded_path_patterns, time_limit_s)
|
||||||
all_trees.append(tree)
|
all_trees.append(tree)
|
||||||
|
@ -233,8 +233,17 @@ def e2e_paths_sorted_desc(tree: DepTree, input_topic_patterns):
|
||||||
`input_topic_patterns`. The paths are sorted by length in a descending manner (element 0 is longest).
|
`input_topic_patterns`. The paths are sorted by length in a descending manner (element 0 is longest).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _collect_all_paths(t: DepTree):
|
# TODO: Make this a Dijkstra/similar implementation instead. This is so slow it's funny.
|
||||||
return [(*_collect_all_paths(d), t.head) for d in t.deps]
|
def _collect_all_paths(t: DepTree, lvl=0):
|
||||||
|
"""
|
||||||
|
Returns a flat list of all paths that lead from t's root to a leaf.
|
||||||
|
"""
|
||||||
|
if lvl > 30 or not t.deps:
|
||||||
|
return [(t.head, )]
|
||||||
|
|
||||||
|
# List of flat lists of paths
|
||||||
|
deps_paths = [_collect_all_paths(d, lvl + 1) for d in t.deps]
|
||||||
|
return [(*path, t.head) for dep_paths in deps_paths for path in dep_paths]
|
||||||
|
|
||||||
def _trim_path(path):
|
def _trim_path(path):
|
||||||
valid_input = False
|
valid_input = False
|
||||||
|
@ -250,14 +259,17 @@ def e2e_paths_sorted_desc(tree: DepTree, input_topic_patterns):
|
||||||
if not valid_input:
|
if not valid_input:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if i == 0 or not isinstance(path[i - 1], TrCallbackInstance):
|
if i == 0:
|
||||||
print(f"[WARN] Message has no publishing callback in dep tree.")
|
#print(end=".")
|
||||||
return path[i:] # Return path from first message that fits an input topic pattern
|
return None
|
||||||
|
if not isinstance(path[i - 1], TrCallbackInstance):
|
||||||
|
#print(end="#")
|
||||||
|
return None
|
||||||
|
|
||||||
return path[i - 1:] # Return path from its publishing callback if it exists
|
return path[i - 1:] # Return path from its publishing callback if it exists
|
||||||
|
|
||||||
paths = _collect_all_paths(tree)
|
paths = _collect_all_paths(tree)
|
||||||
paths = list(filter(lambda p: p is not None, map(_trim_path, tqdm(paths, desc="_trim_path"))))
|
paths = list(filter(None, map(_trim_path, paths)))
|
||||||
paths.sort(key=lambda path: path[-1].timestamp - path[0].timestamp, reverse=True)
|
paths.sort(key=lambda path: path[-1].timestamp - path[0].timestamp, reverse=True)
|
||||||
return paths
|
return paths
|
||||||
|
|
||||||
|
@ -328,7 +340,7 @@ def _repr_path(path: List[TrPublishInstance | TrCallbackInstance]):
|
||||||
def aggregate_e2e_paths(paths: List[List[TrPublishInstance | TrCallbackInstance]]):
|
def aggregate_e2e_paths(paths: List[List[TrPublishInstance | TrCallbackInstance]]):
|
||||||
path_cohorts = defaultdict(list)
|
path_cohorts = defaultdict(list)
|
||||||
|
|
||||||
for path in paths:
|
for path in tqdm(paths, mininterval=5.0, desc="Aggregating E2E path cohorts"):
|
||||||
key = _repr_path(path)
|
key = _repr_path(path)
|
||||||
path_cohorts[key].append(path)
|
path_cohorts[key].append(path)
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,7 @@ def e2e_breakdown_inst_stack(*paths: List[E2EBreakdownItem]):
|
||||||
fig.suptitle("Detailed E2E Latency Path Breakdown")
|
fig.suptitle("Detailed E2E Latency Path Breakdown")
|
||||||
|
|
||||||
bottom = 0
|
bottom = 0
|
||||||
for i in range(len(paths)):
|
for i in range(len(paths[0])):
|
||||||
e2e_items = [path[i] for path in paths]
|
e2e_items = [path[i] for path in paths]
|
||||||
durations = np.array([item.duration for item in e2e_items])
|
durations = np.array([item.duration for item in e2e_items])
|
||||||
ax.bar(range(len(paths)), durations, bottom=bottom)
|
ax.bar(range(len(paths)), durations, bottom=bottom)
|
||||||
|
|
|
@ -5,12 +5,16 @@ E2EBreakdownItem = namedtuple("E2EBreakdownItem", ("type", "duration", "location
|
||||||
DepTree = namedtuple("DepTree", ("head", "deps"))
|
DepTree = namedtuple("DepTree", ("head", "deps"))
|
||||||
|
|
||||||
|
|
||||||
def depth(tree: DepTree):
|
def depth(tree: DepTree, lvl=0):
|
||||||
return 1 + max(map(depth, tree.deps), default=0)
|
if lvl > 1000:
|
||||||
|
return 0
|
||||||
|
return 1 + max(map(lambda d: depth(d, lvl + 1), tree.deps), default=0)
|
||||||
|
|
||||||
|
|
||||||
def size(tree: DepTree):
|
def size(tree: DepTree, lvl=0):
|
||||||
return 1 + sum(map(size, tree.deps))
|
if lvl > 1000:
|
||||||
|
return 0
|
||||||
|
return 1 + sum(map(lambda d: size(d, lvl + 1), tree.deps))
|
||||||
|
|
||||||
|
|
||||||
def fanout(tree: DepTree):
|
def fanout(tree: DepTree):
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
"\n",
|
"\n",
|
||||||
"from misc.utils import cached, parse_as\n",
|
"from misc.utils import cached, parse_as\n",
|
||||||
"\n",
|
"\n",
|
||||||
|
"# TODO: This is useless for some reason (goal is to make graphs in notebook large and hi-res)\n",
|
||||||
"plt.rcParams['figure.dpi'] = 300\n",
|
"plt.rcParams['figure.dpi'] = 300\n",
|
||||||
"\n",
|
"\n",
|
||||||
"%matplotlib inline"
|
"%matplotlib inline"
|
||||||
|
@ -134,10 +135,10 @@
|
||||||
"\n",
|
"\n",
|
||||||
"# E2E paths are uniquely identified by a string like \"/topic/1 -> void(Node1)(args1) -> /topic/2 -> void(Node2)(args2) -> void(Node2)(args3) -> ...\".\n",
|
"# E2E paths are uniquely identified by a string like \"/topic/1 -> void(Node1)(args1) -> /topic/2 -> void(Node2)(args2) -> void(Node2)(args3) -> ...\".\n",
|
||||||
"# Certain patterns only occur in initial setup or in scenario switching and can be excluded via RegEx patterns here.\n",
|
"# Certain patterns only occur in initial setup or in scenario switching and can be excluded via RegEx patterns here.\n",
|
||||||
"E2E_EXCLUDED_PATH_PATTERNS = [r\"NDTScanMatcher\"]\n",
|
"E2E_EXCLUDED_PATH_PATTERNS = [r\"NDTScanMatcher\", r\"^/parameter_events\"]\n",
|
||||||
"\n",
|
"\n",
|
||||||
"\n",
|
"\n",
|
||||||
"DDD = False\n",
|
"DEBUG = False\n",
|
||||||
"\n",
|
"\n",
|
||||||
"# This code overrides the above constants with environment variables, do not edit.\n",
|
"# This code overrides the above constants with environment variables, do not edit.\n",
|
||||||
"for env_key, env_value in os.environ.items():\n",
|
"for env_key, env_value in os.environ.items():\n",
|
||||||
|
@ -443,7 +444,7 @@
|
||||||
"source": [
|
"source": [
|
||||||
"%%skip_if_false E2E_ENABLED\n",
|
"%%skip_if_false E2E_ENABLED\n",
|
||||||
"%%skip_if_false E2E_PLOT\n",
|
"%%skip_if_false E2E_PLOT\n",
|
||||||
"%%skip_if_false DDD\n",
|
"%%skip_if_false DEBUG\n",
|
||||||
"\n",
|
"\n",
|
||||||
"\n",
|
"\n",
|
||||||
"fig, ax = plt.subplots(num=\"e2e_plot\")\n",
|
"fig, ax = plt.subplots(num=\"e2e_plot\")\n",
|
||||||
|
@ -486,33 +487,53 @@
|
||||||
"collapsed": false
|
"collapsed": false
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"%%skip_if_false DEBUG\n",
|
||||||
|
"\n",
|
||||||
|
"import pickle\n",
|
||||||
|
"with open(\"trees.pkl\", \"rb\") as f:\n",
|
||||||
|
" trees = pickle.load(f)"
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"collapsed": false
|
||||||
|
}
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"cell_type": "code",
|
"cell_type": "code",
|
||||||
"execution_count": null,
|
"execution_count": null,
|
||||||
"outputs": [],
|
"outputs": [],
|
||||||
"source": [
|
"source": [
|
||||||
"%%skip_if_false E2E_ENABLED\n",
|
"%%skip_if_false E2E_ENABLED\n",
|
||||||
"%%skip_if_false E2E_PLOT\n",
|
|
||||||
"\n",
|
|
||||||
"%load_ext line_profiler\n",
|
|
||||||
"\n",
|
|
||||||
"\n",
|
"\n",
|
||||||
"from message_tree.message_tree_algorithms import e2e_paths_sorted_desc\n",
|
"from message_tree.message_tree_algorithms import e2e_paths_sorted_desc\n",
|
||||||
"from message_tree.message_tree_plots import e2e_breakdown_type_hist\n",
|
"from message_tree.message_tree_plots import e2e_breakdown_type_hist\n",
|
||||||
"\n",
|
"\n",
|
||||||
"def _map_tree_to_e2es(tree):\n",
|
"trees_paths = [e2e_paths_sorted_desc(tree, E2E_INPUT_TOPIC_PATTERNS) for tree in tqdm(trees, mininterval=10.0,\n",
|
||||||
" return e2e_paths_sorted_desc(tree, E2E_INPUT_TOPIC_PATTERNS)\n",
|
" desc=\"Extracting E2E paths\")]\n",
|
||||||
|
"all_paths = [p for paths in trees_paths for p in paths]\n",
|
||||||
|
"#all_e2e_items = [i for p in all_paths for i in p]"
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"collapsed": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"%%skip_if_false E2E_ENABLED\n",
|
||||||
|
"%%skip_if_false E2E_PLOT\n",
|
||||||
"\n",
|
"\n",
|
||||||
"def ppp():\n",
|
"from message_tree.message_tree_algorithms import e2e_latency_breakdown\n",
|
||||||
" trees_paths = [e2e_paths_sorted_desc(tree, E2E_INPUT_TOPIC_PATTERNS) for tree in tqdm(trees, desc=\"Extracting E2E paths\")]\n",
|
|
||||||
" all_paths = [p for paths in trees_paths for p in paths]\n",
|
|
||||||
" all_e2e_items = [i for p in all_paths for i in p]\n",
|
|
||||||
"\n",
|
"\n",
|
||||||
" return all_e2e_items\n",
|
"conv_items = [i for p in tqdm(all_paths, mininterval=5.0, desc=\"Calculating E2E latency breakdowns\")\n",
|
||||||
"\n",
|
" for i in e2e_latency_breakdown(p)]\n",
|
||||||
"%lprun -f e2e_paths_sorted_desc ppp()\n",
|
"e2e_breakdown_type_hist(conv_items)\n",
|
||||||
"\n",
|
|
||||||
"e2e_breakdown_type_hist(ppp())\n",
|
|
||||||
"\n",
|
"\n",
|
||||||
"None\n"
|
"None\n"
|
||||||
],
|
],
|
||||||
|
@ -525,10 +546,60 @@
|
||||||
"execution_count": null,
|
"execution_count": null,
|
||||||
"outputs": [],
|
"outputs": [],
|
||||||
"source": [
|
"source": [
|
||||||
|
"%%skip_if_false E2E_ENABLED\n",
|
||||||
|
"\n",
|
||||||
|
"from message_tree.message_tree_algorithms import aggregate_e2e_paths\n",
|
||||||
|
"\n",
|
||||||
|
"cohorts = aggregate_e2e_paths(all_paths)\n",
|
||||||
|
"cohort_pairs = [(k, v) for k, v in cohorts.items()]\n",
|
||||||
|
"cohort_pairs.sort(key=lambda kv: len(kv[1]), reverse=True)\n",
|
||||||
|
"\n",
|
||||||
|
"path_records = [{\"path\": path_key,\n",
|
||||||
|
" \"timestamp\": path[-1].timestamp,\n",
|
||||||
|
" \"e2e_latency\": path[-1].timestamp - path[0].timestamp} \\\n",
|
||||||
|
" for path_key, paths in cohort_pairs for path in paths if path]\n",
|
||||||
|
"\n",
|
||||||
|
"out_df = pd.DataFrame.from_records(path_records)\n",
|
||||||
|
"out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\", index=False)\n",
|
||||||
|
"\n",
|
||||||
|
"mode_cohort_key, mode_cohort = cohort_pairs[0]"
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"collapsed": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"%%skip_if_false E2E_ENABLED\n",
|
||||||
|
"%%skip_if_false E2E_PLOT\n",
|
||||||
|
"\n",
|
||||||
|
"from message_tree.message_tree_plots import e2e_breakdown_inst_stack\n",
|
||||||
|
"\n",
|
||||||
|
"\n",
|
||||||
|
"mode_cohort_breakdown = [e2e_latency_breakdown(p) for p in mode_cohort[:200]]\n",
|
||||||
|
"print(len(mode_cohort))\n",
|
||||||
|
"print(mode_cohort_key.replace(\" -> \", \"\\n -> \"))\n",
|
||||||
|
"e2e_breakdown_inst_stack(*mode_cohort_breakdown)"
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"collapsed": false
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"%%skip_if_false E2E_ENABLED\n",
|
||||||
|
"%%skip_if_false DEBUG\n",
|
||||||
|
"\n",
|
||||||
"import pickle\n",
|
"import pickle\n",
|
||||||
"\n",
|
"\n",
|
||||||
"with open(\"trees.pkl\", \"wb\") as f:\n",
|
"with open(\"trees.pkl\", \"wb\") as f:\n",
|
||||||
" pickle.dump(trees, f)\n"
|
" pickle.dump(trees, f)"
|
||||||
],
|
],
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"collapsed": false
|
"collapsed": false
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue