diff --git a/latency_graph/latency_graph_plots.py b/latency_graph/latency_graph_plots.py new file mode 100644 index 0000000..dce6363 --- /dev/null +++ b/latency_graph/latency_graph_plots.py @@ -0,0 +1,201 @@ +import math +import re + +import graphviz as gv + +import latency_graph.latency_graph_structure as lg +from matching.subscriptions import sanitize +from tracing_interop.tr_types import TrContext + +NODE_COLORS = { + "sensing": {"fill": "#e1d5e7", "stroke": "#9673a6"}, + "localization": {"fill": "#dae8fc", "stroke": "#6c8ebf"}, + "perception": {"fill": "#d5e8d4", "stroke": "#82b366"}, + "planning": {"fill": "#fff2cc", "stroke": "#d6b656"}, + "control": {"fill": "#ffe6cc", "stroke": "#d79b00"}, + "system": {"fill": "#f8cecc", "stroke": "#b85450"}, + "vehicle_interface": {"fill": "#b0e3e6", "stroke": "#0e8088"}, + None: {"fill": "#f5f5f5", "stroke": "#666666"} +} + +NODE_NAMESPACE_MAPPING = { + 'perception': 'perception', + 'sensing': 'sensing', + 'planning': 'planning', + 'control': 'control', + 'awapi': 'system', + 'autoware_api': 'system', + 'map': 'system', + 'system': 'system', + 'localization': 'localization', + 'robot_state_publisher': None, + 'aggregator_node': None, + 'pointcloud_container': 'sensing', +} + + +def plot_latency_graph_full(lat_graph: lg.LatencyGraph, tr: TrContext, filename: str): + # Compare with: https://autowarefoundation.github.io/autoware-documentation/main/design/autoware-architecture/node-diagram/ + + g = gv.Digraph('G', filename="latency_graph.gv", + node_attr={'shape': 'plain'}, + graph_attr={'pack': '1'}) + g.graph_attr['rankdir'] = 'LR' + + def plot_hierarchy(gv_parent, lg_node: lg.LGHierarchyLevel, **subgraph_kwargs): + if lg_node.name == "[NONE]": + return + + print(f"{' ' * lg_node.full_name.count('/')}Processing {lg_node.name}: {len(lg_node.callbacks)}") + with gv_parent.subgraph(name=f"cluster_{lg_node.full_name.replace('/', '__')}", **subgraph_kwargs) as c: + c.attr(label=lg_node.name) + for cb in lg_node.callbacks: + if isinstance(cb, lg.LGTrCallback): + tr_cb = cb.cb + try: + sym = tr.callback_symbols.by_id.get(tr_cb.callback_object) + pretty_sym = repr(sanitize(sym.symbol)) + except KeyError: + pretty_sym = cb.name + except TypeError: + pretty_sym = cb.name + else: + pretty_sym = cb.name + + pretty_sym = pretty_sym.replace("&", "&").replace("<", "<").replace(">", ">") + + c.node(cb.id(), + f'<
{pretty_sym}
>') + + for ch in lg_node.children: + plot_hierarchy(c, ch, **subgraph_kwargs) + + def plot_lg(graph: lg.LatencyGraph): + for top_level_node in graph.top_node.children: + colors = NODE_COLORS[NODE_NAMESPACE_MAPPING.get(top_level_node.name)] + plot_hierarchy(g, top_level_node, graph_attr={'bgcolor': colors["fill"], 'pencolor': colors["stroke"]}) + + for edge in graph.edges: + g.edge(f"{edge.start.id()}:out", f"{edge.end.id()}:in") + + plot_lg(lat_graph) + + g.save(f"{filename}.gv") + g.render(f"{filename}.svg") + + return g + + +def plot_latency_graph_overview(lat_graph: lg.LatencyGraph, excl_node_patterns, input_node_patterns, + output_node_patterns, max_hier_level, filename): + ################################################## + # Compute in/out topics for hierarchy level X + ################################################## + + def get_nodes_on_level(lat_graph: lg.LatencyGraph): + def _traverse_node(node: lg.LGHierarchyLevel, cur_lvl=0): + if cur_lvl == max_hier_level: + return [node] + + if not node.children: + return [node] + + collected_nodes = [] + for ch in node.children: + collected_nodes += _traverse_node(ch, cur_lvl + 1) + return collected_nodes + + return _traverse_node(lat_graph.top_node) + + lvl_nodes = get_nodes_on_level(lat_graph) + lvl_nodes = [n for n in lvl_nodes if not any(re.search(p, n.full_name) for p in excl_node_patterns)] + + input_nodes = [n.full_name for n in lvl_nodes if any(re.search(p, n.full_name) for p in input_node_patterns)] + output_nodes = [n.full_name for n in lvl_nodes if any(re.search(p, n.full_name) for p in output_node_patterns)] + + print(', '.join(map(lambda n: n, input_nodes))) + print(', '.join(map(lambda n: n, output_nodes))) + print(', '.join(map(lambda n: n.full_name, lvl_nodes))) + + def _collect_callbacks(n: lg.LGHierarchyLevel): + callbacks = [] + callbacks += n.callbacks + for ch in n.children: + callbacks += _collect_callbacks(ch) + return callbacks + + cb_to_node_map = {} + for n in lvl_nodes: + cbs = _collect_callbacks(n) + for cb in cbs: + cb_to_node_map[cb.id()] = n + + edges_between_nodes = {} + for edge in lat_graph.edges: + from_node = cb_to_node_map.get(edge.start.id()) + to_node = cb_to_node_map.get(edge.end.id()) + + if from_node is None or to_node is None: + continue + + if from_node.full_name == to_node.full_name: + continue + + k = (from_node.full_name, to_node.full_name) + + if k not in edges_between_nodes: + edges_between_nodes[k] = [] + + edges_between_nodes[k].append(edge) + + g = gv.Digraph('G', filename="latency_graph.gv", + node_attr={'shape': 'plain'}, + graph_attr={'pack': '1'}) + g.graph_attr['rankdir'] = 'LR' + + for n in lvl_nodes: + colors = NODE_COLORS[NODE_NAMESPACE_MAPPING.get(n.full_name.strip("/").split("/")[0])] + peripheries = "1" if n.full_name not in output_nodes else "2" + g.node(n.full_name, label=n.full_name, fillcolor=colors["fill"], color=colors["stroke"], + shape="box", style="filled", peripheries=peripheries) + + if n.full_name in input_nodes: + helper_node_name = f"{n.full_name}__before" + g.node(helper_node_name, label="", shape="none", height="0", width="0") + g.edge(helper_node_name, n.full_name) + + def compute_e2e_paths(start_nodes, end_nodes, edges): + frontier_paths = [[n] for n in start_nodes] + final_paths = [] + + while frontier_paths: + frontier_paths_new = [] + + for path in frontier_paths: + head = path[-1] + if head in end_nodes: + final_paths.append(path) + continue + + out_nodes = [n_to for n_from, n_to in edges if n_from == head if n_to not in path] + new_paths = [path + [n] for n in out_nodes] + frontier_paths_new += new_paths + + frontier_paths = frontier_paths_new + + final_paths = [[(n_from, n_to) + for n_from, n_to in zip(path[:-1], path[1:])] + for path in final_paths] + return final_paths + + e2e_paths = compute_e2e_paths(input_nodes, output_nodes, edges_between_nodes) + + for (src_name, dst_name), edges in edges_between_nodes.items(): + print(src_name, dst_name, len(edges)) + color = "black" if any((src_name, dst_name) in path for path in e2e_paths) else "tomato" + g.edge(src_name, dst_name, penwidth=str(math.log(len(edges)) * 2 + .2), color=color) + + g.save(f"{filename}.gv") + g.render(f"{filename}.svg") + + return g diff --git a/latency_graph/latency_graph.py b/latency_graph/latency_graph_structure.py similarity index 96% rename from latency_graph/latency_graph.py rename to latency_graph/latency_graph_structure.py index e2d3d6c..8587d8a 100644 --- a/latency_graph/latency_graph.py +++ b/latency_graph/latency_graph_structure.py @@ -1,18 +1,16 @@ from bisect import bisect_left, bisect_right from collections import defaultdict from dataclasses import dataclass -from itertools import combinations -from multiprocessing import Pool -from typing import Optional, Set, List, Iterable, Dict, Tuple from functools import cache +from itertools import combinations +from typing import Optional, Set, List, Iterable, Dict +from uuid import UUID, uuid4 -import numpy as np from tqdm import tqdm -from tqdm.contrib import concurrent from matching.subscriptions import sanitize -from tracing_interop.tr_types import TrContext, TrCallbackObject, TrCallbackSymbol, TrNode, TrPublisher, TrSubscription, \ - TrTimer, TrPublishInstance, TrSubscriptionObject, TrTopic, TrCallbackInstance, Timestamp +from tracing_interop.tr_types import TrContext, TrCallbackObject, TrCallbackSymbol, TrNode, TrPublisher, TrTimer, \ + TrSubscriptionObject, TrTopic TOPIC_FILTERS = ["/parameter_events", "/tf_static", "/robot_description", "diagnostics", "/rosout"] @@ -203,6 +201,8 @@ class LatencyGraph: cb_pubs: Dict[TrCallbackObject, Set[TrPublisher]] pub_cbs: Dict[TrPublisher, Set[TrCallbackObject]] + _uuid: UUID + def __init__(self, tr: TrContext): ################################################## # Annotate nodes with their callbacks @@ -290,3 +290,10 @@ class LatencyGraph: ################################################## self.top_node = _hierarchize(lg_nodes) + self._uuid = uuid4() + + def __hash__(self): + return hash(self._uuid) + + def __eq__(self, other): + return isinstance(other, LatencyGraph) and other._uuid == self._uuid diff --git a/latency_graph/message_tree.py b/latency_graph/message_tree.py deleted file mode 100644 index 9544bd7..0000000 --- a/latency_graph/message_tree.py +++ /dev/null @@ -1,31 +0,0 @@ -from dataclasses import dataclass -from typing import List - -from tracing_interop.tr_types import TrPublishInstance, TrCallbackInstance - - -@dataclass -class DepTree: - head: TrCallbackInstance | TrPublishInstance - deps: List['DepTree'] - - def depth(self): - return 1 + max(map(DepTree.depth, self.deps), default=0) - - def size(self): - return 1 + sum(map(DepTree.size, self.deps)) - - def fanout(self): - if not self.deps: - return 1 - - return sum(map(DepTree.fanout, self.deps)) - - def e2e_lat(self): - return self.head.timestamp - self.critical_path()[-1].timestamp - - def critical_path(self): - if not self.deps: - return [self.head] - - return [self.head, *min(map(DepTree.critical_path, self.deps), key=lambda ls: ls[-1].timestamp)] diff --git a/message_tree/__init__.py b/message_tree/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/message_tree/message_tree_algorithms.py b/message_tree/message_tree_algorithms.py new file mode 100644 index 0000000..6be5df6 --- /dev/null +++ b/message_tree/message_tree_algorithms.py @@ -0,0 +1,335 @@ +import re +from bisect import bisect_right +from collections import defaultdict +from functools import cache +from typing import List + +from tqdm import tqdm + +from latency_graph.latency_graph_structure import LatencyGraph +from matching.subscriptions import sanitize +from message_tree.message_tree_structure import DepTree, E2EBreakdownItem +from tracing_interop.tr_types import TrCallbackInstance, TrPublishInstance, TrPublisher, TrCallbackObject, TrContext, \ + TrSubscriptionObject, TrTimer, TrNode, TrTopic + + +__dep_tree_cache = {} + + +def _repr(inst: TrCallbackInstance | TrPublishInstance): + """ + If a string representation is found for `inst`, it is returned, else an empty string is returned. + """ + match inst: + case TrPublishInstance(publisher=pub): + return pub.topic_name if pub else "" + case TrCallbackInstance(callback_obj=cb_obj): + cb_obj: TrCallbackObject + return repr(sanitize(cb_obj.callback_symbol.symbol)) if cb_obj and cb_obj.callback_symbol else "" + raise TypeError(f"Argument has to be callback or publish instance, is {type(inst).__name__}") + + +def get_dep_tree(inst: TrPublishInstance | TrCallbackInstance, lat_graph: LatencyGraph, tr: TrContext, + excluded_path_patterns, time_limit_s): + """ + Finds all (desired) dependencies of a publish/callback instance and returns a dependency tree. + + The dependencies can be filtered via `time_limit_s`, which is the maximum difference in start time between `inst` + and any dependency in the tree. + Another filter is `excluded_path_patterns`, which cuts off paths where one instance's string representation matches + any of the given regex patterns. + """ + + + start_time = inst.timestamp + + def __get_dep_tree(inst, is_dep_cb, visited=None): + + # If inst owner has been visited already, skip (loop prevention) + if visited is not None and owner(inst) in visited: + return None + + # If we want to retrieve the tree, look in the cache first + cache_key = (inst, is_dep_cb) + if cache_key in __dep_tree_cache: + return __dep_tree_cache[cache_key] + + if visited is None: + visited = tuple() + + if any(re.search(p, _repr(inst)) for p in excluded_path_patterns): + return None + + if inst.timestamp - start_time > time_limit_s: + return None + + children_are_dep_cbs = False + + match inst: + case TrPublishInstance(publisher=pub): + deps = [get_msg_dep_cb(inst, lat_graph)] + case TrCallbackInstance() as cb_inst: + cb_inst: TrCallbackInstance + deps = [inst_get_dep_msg(cb_inst, tr)] + if not is_dep_cb: + deps += inst_get_dep_insts(cb_inst, tr) + children_are_dep_cbs = True + case _: + print("[WARN] Expected inst to be of type TrPublishInstance or TrCallbackInstance, " + f"got {type(inst).__name__}") + return None + # print("Rec level", lvl) + deps = list(filter(None, deps)) + # Copy visited set for each child because we don't want separate paths to interfere with each other + deps = [__get_dep_tree(dep, children_are_dep_cbs, {*visited, owner(inst)}) for dep in deps] + deps = list(filter(None, deps)) + + # Create tree instance, cache and return it + ret_tree = DepTree(inst, deps) + __dep_tree_cache[cache_key] = ret_tree + return ret_tree + + return __get_dep_tree(inst, False) + + +def build_dep_trees(end_topics, lat_graph, tr, excluded_path_patterns, time_limit_s): + """ + Builds the dependency trees for all messages published in any of `end_topics` and returns them as a list. + """ + all_trees = [] + for end_topic in end_topics: + end_topic: TrTopic + print(f"====={end_topic.name}") + + pubs = end_topic.publishers + for pub in pubs: + msgs = list(pub.instances) + for msg in tqdm(msgs, desc="Processing output messages"): + msg: TrPublishInstance + tree = get_dep_tree(msg, lat_graph, tr, excluded_path_patterns, time_limit_s) + all_trees.append(tree) + return all_trees + + +def inst_get_dep_msg(inst: TrCallbackInstance, tr: TrContext): + if inst.callback_object not in tr.callback_objects.by_callback_object: + # print("Callback not found (2)") + return None + + if not isinstance(inst.callback_obj.owner, TrSubscriptionObject): + # print(f"Wrong type: {type(inst.callback_obj.owner)}") + return None + + sub_obj: TrSubscriptionObject = inst.callback_obj.owner + if sub_obj and sub_obj.subscription and sub_obj.subscription.topic: + # print(f"Subscription has no topic") + pubs = sub_obj.subscription.topic.publishers + else: + pubs = [] + + def _pub_latest_msg_before(pub: TrPublisher, inst): + i_latest_msg = bisect_right(pub.instances, inst.timestamp, key=lambda x: x.timestamp) - 1 + if i_latest_msg < 0 or i_latest_msg >= len(pub.instances): + return None + latest_msg = pub.instances[i_latest_msg] + if latest_msg.timestamp >= inst.timestamp: + return None + + return latest_msg + + msgs = [_pub_latest_msg_before(pub, inst) for pub in pubs] + msgs = list(filter(None, msgs)) + msgs.sort(key=lambda i: i.timestamp, reverse=True) + if msgs: + msg = msgs[0] + return msg + + # print(f"No messages found for topic {sub_obj.subscription.topic}") + return None + + +def inst_get_dep_insts(inst: TrCallbackInstance, tr: TrContext): + if inst.callback_object not in tr.callback_objects.by_callback_object: + # print("Callback not found") + return [] + dep_cbs = get_cb_dep_cbs(inst.callback_obj, tr) + + def _cb_to_chronological_inst(cb: TrCallbackObject, inst): + i_inst_latest = bisect_right(cb.callback_instances, inst.timestamp, key=lambda x: x.timestamp) + + for inst_before in cb.callback_instances[i_inst_latest::-1]: + if inst_before.t_end < inst.timestamp: + return inst_before + + return None + + insts = [_cb_to_chronological_inst(cb, inst) for cb in dep_cbs] + insts = list(filter(None, insts)) + return insts + + +def get_cb_dep_cbs(cb: TrCallbackObject, tr: TrContext): + match cb.owner: + case TrSubscriptionObject() as sub_obj: + sub_obj: TrSubscriptionObject + owner = sub_obj.subscription.node + case TrTimer() as tmr: + tmr: TrTimer + owner = tmr.node + case _: + raise RuntimeError(f"Encountered {cb.owner} as callback owner") + + owner: TrNode + dep_sub_objs = {sub.subscription_object for sub in owner.subscriptions} + dep_cbs = {tr.callback_objects.by_id.get(sub_obj.id) for sub_obj in dep_sub_objs if sub_obj is not None} + dep_cbs |= {tr.callback_objects.by_id.get(tmr.id) for tmr in owner.timers} + dep_cbs.discard(cb) + dep_cbs.discard(None) + + return dep_cbs + + +def get_msg_dep_cb(msg: TrPublishInstance, lat_graph: LatencyGraph): + """ + For a given message instance `msg`, find the publishing callback, + as well as the message instances that callback depends on (transitively within its TrNode). + """ + + # Find CB instance that published msg + # print(f"PUB {msg.publisher.node.path if msg.publisher.node is not None else '??'} ==> {msg.publisher.topic_name}") + pub_cbs = lat_graph.pub_cbs.get(msg.publisher) + if pub_cbs is None: + # print("Publisher unknown to lat graph. Skipping.") + return None + + # print(f"Found {len(pub_cbs)} pub cbs") + cb_inst_candidates = [] + for cb in pub_cbs: + # print(f" > CB ({len(cb.callback_instances)} instances): {cb.callback_symbol.symbol if cb.callback_symbol else cb.id}") + i_inst_after = bisect_right(cb.callback_instances, msg.timestamp, key=lambda x: x.timestamp) + + for inst in cb.callback_instances[:i_inst_after]: + if msg.timestamp > inst.t_end: + continue + + assert inst.t_start <= msg.timestamp <= inst.t_end + + cb_inst_candidates.append(inst) + + if len(cb_inst_candidates) > 1: + # print("Found multiple possible callbacks") + return None + if not cb_inst_candidates: + # print("Found no possible callbacks") + return None + + dep_inst = cb_inst_candidates[0] + return dep_inst + + +def e2e_paths_sorted_desc(tree: DepTree, input_topic_patterns): + """ + Return all paths through `tree` that start with a callback instance publishing on a topic matching any of + `input_topic_patterns`. The paths are sorted by length in a descending manner (element 0 is longest). + """ + + def _collect_all_paths(t: DepTree): + return [(*_collect_all_paths(d), t.head) for d in t.deps] + + def _trim_path(path): + valid_input = False + i = -1 + for i, inst in enumerate(path): + match inst: + case TrPublishInstance(publisher=pub): + pub: TrPublisher + if pub and any(re.search(p, pub.topic_name) for p in input_topic_patterns): + valid_input = True + break + + if not valid_input: + return None + + if i == 0 or not isinstance(path[i - 1], TrCallbackInstance): + print(f"[WARN] Message has no publishing callback in dep tree.") + return path[i:] # Return path from first message that fits an input topic pattern + + return path[i - 1:] # Return path from its publishing callback if it exists + + paths = _collect_all_paths(tree) + paths = list(filter(lambda p: p is not None, map(_trim_path, tqdm(paths, desc="_trim_path")))) + paths.sort(key=lambda path: path[-1].timestamp - path[0].timestamp, reverse=True) + return paths + + +def e2e_latency_breakdown(path: list): + """ + Separates E2E latency into a sequence of dds, idle, and cpu times. + This method expects a publish instance at the last position in `path`. + + The return format is a list of the form [("",