diff --git a/clang_interop/__init__.py b/clang_interop/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/annotate_interactively.py b/clang_interop/process_clang_output.py similarity index 55% rename from annotate_interactively.py rename to clang_interop/process_clang_output.py index b99182a..a6ab91b 100644 --- a/annotate_interactively.py +++ b/clang_interop/process_clang_output.py @@ -3,13 +3,15 @@ import json import os import pickle import re -from dataclasses import dataclass -from typing import Tuple, List, Literal, Iterable +from typing import Tuple, Iterable import numpy as np import pandas as pd import termcolor +from clang_interop.types import ClNode, ClField, ClTimer, ClMethod, ClPublisher, ClSubscription, ClMemberRef, ClContext, \ + ClTranslationUnit + IN_DIR = "/home/max/Projects/llvm-project/clang-tools-extra/ros2-internal-dependency-checker/output" SRC_DIR = "/home/max/Projects/autoware/src" @@ -58,159 +60,10 @@ def fuse_objects(o1, o2): return o1 -@dataclass -class SourceRange: - start_file: str - start_line: int | None - start_col: int | None - - end_file: str - end_line: int | None - end_col: int | None - - def __init__(self, json_obj): - begin = json_obj["begin"].split(":") - end = json_obj["end"].split(":") - - self.start_file = os.path.realpath(begin[0]) - self.start_line = int(begin[1]) if len(begin) > 1 else None - self.start_col = int(begin[2].split(" ")[0]) if len(begin) > 2 else None - - self.end_file = os.path.realpath(end[0]) - self.end_line = int(end[1]) if len(end) > 1 else None - self.end_col = int(end[2].split(" ")[0]) if len(end) > 2 else None - - def __hash__(self): - return hash((self.start_file, self.start_line, self.start_col, - self.end_file, self.end_line, self.end_col)) - - -@dataclass -class Node: - id: int - qualified_name: str - source_range: 'SourceRange' - field_ids: List[int] | None - method_ids: List[int] | None - - def __init__(self, json_obj): - self.id = json_obj['id'] - self.qualified_name = json_obj['id'] - self.source_range = SourceRange(json_obj['source_range']) - self.field_ids = list(map(lambda obj: obj['id'], json_obj['fields'])) if 'fields' in json_obj else None - self.method_ids = list(map(lambda obj: obj['id'], json_obj['methods'])) if 'methods' in json_obj else None - - def __hash__(self): - return hash(self.id) - - -@dataclass -class Method: - id: int - qualified_name: str - source_range: 'SourceRange' - return_type: str | None - parameter_types: List[str] | None - - def __init__(self, json_obj): - self.id = json_obj['id'] - self.qualified_name = json_obj['qualified_name'] - self.source_range = SourceRange(json_obj['source_range']) - self.return_type = json_obj['signature']['return_type'] if 'signature' in json_obj else None - self.parameter_types = json_obj['signature']['parameter_types'] if 'signature' in json_obj else None - - def __hash__(self): - return hash(self.id) - - -@dataclass -class Field: - id: int - qualified_name: str - source_range: 'SourceRange' - - def __init__(self, json_obj): - self.id = json_obj['id'] - self.qualified_name = json_obj['qualified_name'] - self.source_range = SourceRange(json_obj['source_range']) - - def __hash__(self): - return hash(self.id) - - -@dataclass -class MemberRef: - type: Literal["read", "write", "call", "arg", "pub"] | None - member_chain: List[int] - method_id: int | None - node_id: int | None - source_range: 'SourceRange' - - def __init__(self, json_obj): - access_type = json_obj['context']['access_type'] - if access_type == 'none': - access_type = None - self.type = access_type - self.member_chain = list(map(lambda obj: obj['id'], json_obj['member'][::-1])) - self.method_id = json_obj['context']['method']['id'] if 'method' in json_obj['context'] else None - self.node_id = json_obj['context']['node']['id'] if 'node' in json_obj['context'] else None - self.source_range = SourceRange(json_obj['context']['statement']['source_range']) - - def __hash__(self): - return self.source_range.__hash__() - - -@dataclass -class Subscription: - topic: str | None - callback_id: int | None - source_range: 'SourceRange' - - def __init__(self, json_obj): - self.topic = json_obj['topic'] if 'topic' in json_obj else None - self.callback_id = json_obj['callback']['id'] if 'callback' in json_obj else None - self.source_range = SourceRange(json_obj['source_range']) - - def __hash__(self): - return self.source_range.__hash__() - - -@dataclass -class Publisher: - topic: str | None - member_id: int | None - source_range: 'SourceRange' - - def update(self, t2: 'Timer'): - return self - - def __init__(self, json_obj): - self.topic = json_obj['topic'] if 'topic' in json_obj else None - self.member_id = json_obj['member']['id'] if 'member' in json_obj else None - self.source_range = SourceRange(json_obj['source_range']) - - def __hash__(self): - return self.source_range.__hash__() - - -@dataclass -class Timer: - callback_id: int | None - source_range: 'SourceRange' - - def __init__(self, json_obj): - self.callback_id = json_obj['callback']['id'] if 'callback' in json_obj else None - self.source_range = SourceRange(json_obj['source_range']) - - def __hash__(self): - return self.source_range.__hash__() - - -def find_data_deps(nodes: Iterable[Node], pubs: Iterable[Publisher], subs: Iterable[Subscription], - timers: Iterable[Timer], fields, methods, accesses: Iterable[MemberRef]): +def find_data_deps(accesses: Iterable[ClMemberRef]): writes = set() reads = set() - publications = set() + publications = {} for member_ref in accesses: member_id = member_ref.member_chain[0] if member_ref.member_chain else None @@ -229,7 +82,9 @@ def find_data_deps(nodes: Iterable[Node], pubs: Iterable[Publisher], subs: Itera writes.add(dep_tuple) reads.add(dep_tuple) case "pub": - publications.add(dep_tuple) + if member_ref.method_id not in publications: + publications[member_ref.method_id] = set() + publications[member_ref.method_id].add(member_id) reads = pd.DataFrame.from_records(list(reads), columns=['method_id', 'member_id']) writes = pd.DataFrame.from_records(list(writes), columns=['method_id', 'member_id']) @@ -250,7 +105,7 @@ def find_data_deps(nodes: Iterable[Node], pubs: Iterable[Publisher], subs: Itera deps[reading_method].discard(reading_method) # Remove reflexive dependencies - return publications, deps + return deps, publications def dedup(elems): @@ -274,6 +129,10 @@ def dedup(elems): return ret_list +def dictify(elems, key='id'): + return {getattr(e, key): e for e in elems} + + def definitions_from_json(cb_dict): nodes = [] pubs = [] @@ -285,35 +144,35 @@ def definitions_from_json(cb_dict): if "nodes" in cb_dict: for node in cb_dict["nodes"]: - nodes.append(Node(node)) + nodes.append(ClNode(node)) for field in node["fields"]: - fields.append(Field(field)) + fields.append(ClField(field)) for method in node["methods"]: - methods.append(Method(method)) + methods.append(ClMethod(method)) if "publishers" in cb_dict: for publisher in cb_dict["publishers"]: - pubs.append(Publisher(publisher)) + pubs.append(ClPublisher(publisher)) if "subscriptions" in cb_dict: for subscription in cb_dict["subscriptions"]: - subs.append(Subscription(subscription)) + subs.append(ClSubscription(subscription)) if "timers" in cb_dict: for timer in cb_dict["timers"]: - timers.append(Timer(timer)) + timers.append(ClTimer(timer)) if "accesses" in cb_dict: for access_type in cb_dict["accesses"]: for access in cb_dict["accesses"][access_type]: - accesses.append(MemberRef(access)) + accesses.append(ClMemberRef(access)) - nodes = dedup(nodes) - pubs = dedup(pubs) - subs = dedup(subs) - timers = dedup(timers) - fields = dedup(fields) - methods = dedup(methods) + nodes = dictify(dedup(nodes)) + pubs = dictify(dedup(pubs), key='member_id') + subs = dictify(dedup(subs), key='callback_id') + timers = dictify(dedup(timers), key='callback_id') + fields = dictify(dedup(fields)) + methods = dictify(dedup(methods)) return nodes, pubs, subs, timers, fields, methods, accesses @@ -341,7 +200,7 @@ def prompt_user(file: str, cb: str, idf: str, text: str) -> Tuple[str, bool, boo return answer, answer == "q", answer == "z" -def main(nodes, cbs, fields, methods): +def main(cbs): open_files = {} cb_rw_dict = {} @@ -407,8 +266,8 @@ def main(nodes, cbs, fields, methods): print("Done.") -if __name__ == "__main__": - out_dict = {} +def process_clang_output(directory=IN_DIR): + clang_context = ClContext() for filename in os.listdir(IN_DIR): source_filename = SRC_FILE_NAME(filename) @@ -418,22 +277,20 @@ if __name__ == "__main__": if cb_dict is None: print(f" [WARN ] Empty tool output detected in {filename}") continue - definitions = definitions_from_json(cb_dict) - deps, publications = find_data_deps(*definitions) - (nodes, pubs, subs, timers, fields, methods, accesses) = definitions - out_dict[source_filename] = { - "dependencies": deps, - "publications": publications, - "nodes": nodes, - "publishers": pubs, - "subscriptions": subs, - "timers": timers, - "fields": fields, - "methods": methods - } + nodes, pubs, subs, timers, fields, methods, accesses = definitions_from_json(cb_dict) + deps, publications = find_data_deps(accesses) + + tu = ClTranslationUnit(deps, publications, nodes, pubs, subs, timers, fields, methods, accesses) + clang_context.translation_units[source_filename] = tu + + return clang_context + + +if __name__ == "__main__": + clang_context = process_clang_output() with open(OUT_NAME, "wb") as f: - pickle.dump(out_dict, f) + pickle.dump(clang_context, f) print("Done.") diff --git a/clang_interop/types.py b/clang_interop/types.py new file mode 100644 index 0000000..b75baf9 --- /dev/null +++ b/clang_interop/types.py @@ -0,0 +1,173 @@ +import os +from dataclasses import dataclass, field +from typing import List, Literal, Dict, Set + + +@dataclass +class ClTranslationUnit: + dependencies: Dict[int, Set[int]] + publications: Dict[int, Set[int]] + nodes: Dict[int, 'ClNode'] + publishers: Dict[int, 'ClPublisher'] + subscriptions: Dict[int, 'ClSubscription'] + timers: Dict[int, 'ClTimer'] + fields: Dict[int, 'ClField'] + methods: Dict[int, 'ClMethod'] + accesses: List['ClMemberRef'] + + +@dataclass +class ClContext: + translation_units: Dict[str, 'ClTranslationUnit'] = field(default_factory=dict) + + +@dataclass +class ClSourceRange: + start_file: str + start_line: int | None + start_col: int | None + + end_file: str + end_line: int | None + end_col: int | None + + def __init__(self, json_obj): + begin = json_obj["begin"].split(":") + end = json_obj["end"].split(":") + + self.start_file = os.path.realpath(begin[0]) + self.start_line = int(begin[1]) if len(begin) > 1 else None + self.start_col = int(begin[2].split(" ")[0]) if len(begin) > 2 else None + + self.end_file = os.path.realpath(end[0]) + self.end_line = int(end[1]) if len(end) > 1 else None + self.end_col = int(end[2].split(" ")[0]) if len(end) > 2 else None + + def __hash__(self): + return hash((self.start_file, self.start_line, self.start_col, + self.end_file, self.end_line, self.end_col)) + + +@dataclass +class ClNode: + id: int + qualified_name: str + source_range: 'ClSourceRange' + field_ids: List[int] | None + method_ids: List[int] | None + ros_name: str | None + ros_namespace: str | None + + def __init__(self, json_obj): + self.id = json_obj['id'] + self.qualified_name = json_obj['id'] + self.source_range = ClSourceRange(json_obj['source_range']) + self.field_ids = list(map(lambda obj: obj['id'], json_obj['fields'])) if 'fields' in json_obj else None + self.method_ids = list(map(lambda obj: obj['id'], json_obj['methods'])) if 'methods' in json_obj else None + self.ros_name = json_obj['ros_name'] if 'ros_name' in json_obj else None + self.ros_namespace = json_obj['ros_namespace'] if 'ros_namespace' in json_obj else None + + def __hash__(self): + return hash(self.id) + + +@dataclass +class ClMethod: + id: int + qualified_name: str + source_range: 'ClSourceRange' + return_type: str | None + parameter_types: List[str] | None + + def __init__(self, json_obj): + self.id = json_obj['id'] + self.qualified_name = json_obj['qualified_name'] + self.source_range = ClSourceRange(json_obj['source_range']) + self.return_type = json_obj['signature']['return_type'] if 'signature' in json_obj else None + self.parameter_types = json_obj['signature']['parameter_types'] if 'signature' in json_obj else None + + def __hash__(self): + return hash(self.id) + + +@dataclass +class ClField: + id: int + qualified_name: str + source_range: 'ClSourceRange' + + def __init__(self, json_obj): + self.id = json_obj['id'] + self.qualified_name = json_obj['qualified_name'] + self.source_range = ClSourceRange(json_obj['source_range']) + + def __hash__(self): + return hash(self.id) + + +@dataclass +class ClMemberRef: + type: Literal["read", "write", "call", "arg", "pub"] | None + member_chain: List[int] + method_id: int | None + node_id: int | None + source_range: 'ClSourceRange' + + def __init__(self, json_obj): + access_type = json_obj['context']['access_type'] + if access_type == 'none': + access_type = None + self.type = access_type + self.member_chain = list(map(lambda obj: obj['id'], json_obj['member'][::-1])) + self.method_id = json_obj['context']['method']['id'] if 'method' in json_obj['context'] else None + self.node_id = json_obj['context']['node']['id'] if 'node' in json_obj['context'] else None + self.source_range = ClSourceRange(json_obj['context']['statement']['source_range']) + + def __hash__(self): + return self.source_range.__hash__() + + +@dataclass +class ClSubscription: + topic: str | None + callback_id: int | None + source_range: 'ClSourceRange' + + def __init__(self, json_obj): + self.topic = json_obj['topic'] if 'topic' in json_obj else None + self.callback_id = json_obj['callback']['id'] if 'callback' in json_obj else None + self.source_range = ClSourceRange(json_obj['source_range']) + + def __hash__(self): + return self.source_range.__hash__() + + +@dataclass +class ClPublisher: + topic: str | None + member_id: int | None + source_range: 'ClSourceRange' + + def update(self, t2: 'ClTimer'): + return self + + def __init__(self, json_obj): + self.topic = json_obj['topic'] if 'topic' in json_obj else None + self.member_id = json_obj['member']['id'] if 'member' in json_obj else None + self.source_range = ClSourceRange(json_obj['source_range']) + + def __hash__(self): + return self.source_range.__hash__() + + +@dataclass +class ClTimer: + callback_id: int | None + source_range: 'ClSourceRange' + + def __init__(self, json_obj): + self.callback_id = json_obj['callback']['id'] if 'callback' in json_obj else None + self.source_range = ClSourceRange(json_obj['source_range']) + + def __hash__(self): + return self.source_range.__hash__() diff --git a/misc/__init__.py b/misc/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/misc/utils.py b/misc/utils.py new file mode 100644 index 0000000..2b68226 --- /dev/null +++ b/misc/utils.py @@ -0,0 +1,77 @@ +import base64 +import glob +import hashlib +import json +import math +import os +import pickle +import time +from typing import List + + +def left_abbreviate(string, limit=120): + return string if len(string) <= limit else f"...{string[:limit - 3]}" + + +class ProgressPrinter: + def __init__(self, verb, n) -> None: + self.verb = verb + self.n = n + self.i = 0 + self.fmt_len = math.ceil(math.log10(n if n > 0 else 1)) + + def step(self, msg): + self.i += 1 + print(f"({self.i:>{self.fmt_len}d}/{self.n}) {self.verb} {left_abbreviate(msg):<120}", end="\r") + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + self.i -= 1 + + if exc_value: + self.step("error.") + print() + print(exc_value) + return + + self.step("done.") + print() + + +def stable_hash(obj): + return hashlib.md5(json.dumps(obj).encode("utf-8")).hexdigest()[:10] + + +def cached(name, function, file_deps: List[str]): + if not os.path.isdir("cache"): + os.makedirs("cache", exist_ok=True) + + dep_time = 0.0 + for file in file_deps: + m_time = os.path.getmtime(file) if os.path.exists(file) else 0. + if m_time > dep_time: + dep_time = m_time + + deps_hash = stable_hash(sorted(file_deps)) + pkl_filename = f"cache/{name}_{deps_hash}.pkl" + + pkl_time = os.path.getmtime(pkl_filename) if os.path.exists(pkl_filename) else 0. + + if pkl_time > dep_time: + with open(pkl_filename, "rb") as f: + print(f"[CACHE] Found up-to-date cache entry for {name}, loading.") + return pickle.load(f) + + if os.path.exists(pkl_filename): + print(f"[CACHE] Data dependencies for {name} changed, deleting cached version.") + os.remove(pkl_filename) + + print(f"[CACHE] Creating cache entry for {name} (in {pkl_filename}).") + obj = function() + + with open(pkl_filename, "wb") as f: + pickle.dump(obj, f) + + return obj diff --git a/trace-analysis.ipynb b/trace-analysis.ipynb index 5f8a2a7..632c37b 100644 --- a/trace-analysis.ipynb +++ b/trace-analysis.ipynb @@ -12,11 +12,13 @@ "source": [ "import os\n", "import sys\n", + "\n", "import numpy as np\n", "import pandas as pd\n", "from matplotlib import pyplot as plt\n", "\n", - "from IPython.display import display, clear_output\n", + "from clang_interop.process_clang_output import process_clang_output\n", + "from clang_interop.types import ClContext\n", "\n", "sys.path.append(\"../autoware/build/tracetools_read/\")\n", "sys.path.append(\"../autoware/build/tracetools_analysis/\")\n", @@ -26,12 +28,11 @@ "from tracetools_analysis.utils.ros2 import Ros2DataModelUtil\n", "\n", "from dataclasses import dataclass\n", - "from typing import List, Dict, Set, Union, Tuple\n", - "from functools import cached_property\n", - "import pickle\n", - "import re\n", + "from typing import List, Dict, Set, Tuple\n", "\n", - "from utils import ProgressPrinter" + "from tracing_interop.types import TrTimer, TrTopic, TrPublisher, TrPublishInstance, TrCallbackInstance, \\\n", + " TrCallbackSymbol, TrCallbackObject, TrSubscriptionObject, TrContext, TrNode\n", + "from misc.utils import ProgressPrinter, cached" ] }, { @@ -44,83 +45,8 @@ }, "outputs": [], "source": [ - "def pkl_filename_from_file_timestamp(file_path):\n", - " if os.path.exists(file_path):\n", - " timestamp = os.path.getmtime(file_path)\n", - " pkl_filename = f\"ros_objects_{hash(timestamp)}.pkl\"\n", - " return pkl_filename, os.path.exists(pkl_filename)\n", - " return None, False\n", - "\n", - "path = os.path.expanduser(\"~/Downloads/autoware-trace/ust\")\n", - "path_converted = os.path.join(path, 'converted')\n", - "pkl_filename, pkl_exists = pkl_filename_from_file_timestamp(path_converted)\n", - "\n", - "if not pkl_exists:\n", - " file = load_file(path)\n", - " handler = Ros2Handler.process(file)\n", - " util = Ros2DataModelUtil(handler)\n", - " pkl_filename, pkl_exists = pkl_filename_from_file_timestamp(path_converted)" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": { - "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [], - "source": [ - "if False:\n", - " n=3\n", - " self = handler.data\n", - " print('====================ROS 2 DATA MODEL===================')\n", - " print('██ Contexts: ██')\n", - " print(self.contexts[:n].to_string())\n", - " print('██ Nodes: ██')\n", - " print(self.nodes[:n].to_string())\n", - " print('██ Publishers (rmw): ██')\n", - " print(self.rmw_publishers[:n].to_string())\n", - " print('██ Publishers (rcl): ██')\n", - " print(self.rcl_publishers[:n].to_string())\n", - " print('██ Subscriptions (rmw): ██')\n", - " print(self.rmw_subscriptions[:n].to_string())\n", - " print('██ Subscriptions (rcl): ██')\n", - " print(self.rcl_subscriptions[:n].to_string())\n", - " print('██ Subscription objects: ██')\n", - " print(self.subscription_objects[:n].to_string())\n", - " print('██ Services: ██')\n", - " print(self.services[:n].to_string())\n", - " print('██ Clients: ██')\n", - " print(self.clients[:n].to_string())\n", - " print('██ Timers: ██')\n", - " print(self.timers[:n].to_string())\n", - " print('██ Timer-node links: ██')\n", - " print(self.timer_node_links[:n].to_string())\n", - " print('██ Callback objects: ██')\n", - " print(self.callback_objects[:n].to_string())\n", - " print('██ Callback symbols: ██')\n", - " print(self.callback_symbols[:n].to_string())\n", - " print('██ Callback instances: ██')\n", - " print(self.callback_instances[:n].to_string())\n", - " print('██ Publish instances (rclcpp): ██')\n", - " print(self.rclcpp_publish_instances[:n].to_string())\n", - " print('██ Publish instances (rcl): ██')\n", - " print(self.rcl_publish_instances[:n].to_string())\n", - " print('██ Publish instances (rmw): ██')\n", - " print(self.rmw_publish_instances[:n].to_string())\n", - " print('██ Take instances (rmw): ██')\n", - " print(self.rmw_take_instances[:n].to_string())\n", - " print('██ Take instances (rcl): ██')\n", - " print(self.rcl_take_instances[:n].to_string())\n", - " print('██ Take instances (rclcpp): ██')\n", - " print(self.rclcpp_take_instances[:n].to_string())\n", - " print('██ Lifecycle state machines: ██')\n", - " print(self.lifecycle_state_machines[:n].to_string())\n", - " print('██ Lifecycle transitions: ██')\n", - " print(self.lifecycle_transitions[:n].to_string())\n", - " print('==================================================')" + "TR_PATH = os.path.expanduser(\"~/Downloads/autoware-trace/ust\")\n", + "CL_PATH = os.path.expanduser(\"~/Projects/llvm-project/clang-tools-extra/ros2-internal-dependency-checker/output\")" ] }, { @@ -131,278 +57,16 @@ } }, "source": [ - "# Data Structures" + "# Organize Trace Data" ] }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": { "pycharm": { - "name": "#%%\n" - } - }, - "outputs": [], - "source": [ - "def str_to_cls(classname):\n", - " return getattr(sys.modules[__name__], classname)\n", - "\n", - "def row_to_type(row, type, has_idx):\n", - " return type(id=row.name, **row) if has_idx else type(**row)\n", - "\n", - "def df_to_type_list(df, type):\n", - " if isinstance(type, str):\n", - " type = str_to_cls(type)\n", - " \n", - " has_idx = not isinstance(df.index, pd.RangeIndex)\n", - " return [row_to_type(row, type, has_idx) for _, row in df.iterrows()]\n", - "\n", - "def by_index(df, index, type):\n", - " return df_to_type_list(df.loc[index], type)\n", - "\n", - "def by_column(df, column_name, column_val, type):\n", - " return df_to_type_list(df[df[column_name] == column_val], type)\n", - "\n", - "def list_to_dict(ls, key='id'):\n", - " return {getattr(item, key): item for item in ls}\n", - "\n", - "#################################\n", - "# Predefined (from ROS2DataModel)\n", - "#################################\n", - "\n", - "@dataclass\n", - "class Node:\n", - " id: int\n", - " timestamp: int\n", - " tid: int\n", - " rmw_handle: int\n", - " name: str\n", - " namespace: str\n", - "\n", - " @cached_property\n", - " def path(self) -> str:\n", - " return '/'.join((self.namespace, self.name))\n", - "\n", - " @cached_property\n", - " def publishers(self) -> List['Publisher']:\n", - " return list(filter(lambda pub: pub.node_handle == self.id, publishers.values()))\n", - "\n", - " @cached_property\n", - " def subscriptions(self) -> List['Subscription']:\n", - " return list(filter(lambda sub: sub.node_handle == self.id, subscriptions.values()))\n", - " \n", - " @cached_property\n", - " def timers(self) -> List['Timer']:\n", - " links = [link.id for link in timer_node_links.values() if link.node_handle == self.id]\n", - " return list(filter(lambda timer: timer.id in links, timers.values()))\n", - "\n", - " def __hash__(self):\n", - " return hash(self.id)\n", - "\n", - "@dataclass\n", - "class Publisher:\n", - " id: int\n", - " timestamp: int\n", - " node_handle: int\n", - " rmw_handle: int\n", - " topic_name: str\n", - " depth: int\n", - "\n", - " @property\n", - " def node(self) -> 'Node':\n", - " return nodes[self.node_handle]\n", - "\n", - " @cached_property\n", - " def subscriptions(self) -> List['Subscription']:\n", - " return list(filter(lambda sub: sub.topic_name == self.topic_name, subscriptions.values()))\n", - "\n", - " @cached_property\n", - " def instances(self) -> List['PublishInstance']:\n", - " return list(filter(lambda inst: inst.publisher_handle == self.id, publish_instances))\n", - " \n", - " @property\n", - " def topic(self) -> 'Topic':\n", - " return topics[self.topic_name]\n", - "\n", - " def __hash__(self):\n", - " return hash(self.id)\n", - "\n", - "\n", - "@dataclass\n", - "class Subscription:\n", - " id: int\n", - " timestamp: int\n", - " node_handle: int\n", - " rmw_handle: int\n", - " topic_name: str\n", - " depth: int\n", - "\n", - " @property\n", - " def node(self) -> 'Node':\n", - " return nodes[self.node_handle]\n", - "\n", - " @cached_property\n", - " def publishers(self) -> List['Publisher']:\n", - " return list(filter(lambda pub: pub.topic_name == self.topic_name, publishers.values()))\n", - " \n", - " @cached_property\n", - " def subscription_objects(self) -> 'SubscriptionObject':\n", - " return list(filter(lambda sub_obj: sub_obj.subscription_handle == self.id, subscription_objects.values()))\n", - "\n", - " @property\n", - " def topic(self) -> 'Topic':\n", - " return topics[self.topic_name]\n", - "\n", - " def __hash__(self):\n", - " return hash(self.id)\n", - " \n", - "@dataclass\n", - "class Timer:\n", - " id: int\n", - " timestamp: int\n", - " period: int\n", - " tid: int\n", - "\n", - " @cached_property\n", - " def nodes(self) -> List['Node']:\n", - " links = [link.node_handle for link in timer_node_links.values() if link.id == self.id]\n", - " return list(filter(lambda node: node.id in links, nodes.values()))\n", - " \n", - " @property\n", - " def callback_object(self) -> 'CallbackObject':\n", - " return callback_objects[self.id]\n", - "\n", - " def __hash__(self):\n", - " return hash(self.id)\n", - "\n", - "@dataclass\n", - "class TimerNodeLink:\n", - " id: int\n", - " timestamp: int\n", - " node_handle: int\n", - "\n", - "@dataclass\n", - "class SubscriptionObject:\n", - " id: int # subscription\n", - " timestamp: int\n", - " subscription_handle: int\n", - "\n", - " @property\n", - " def subscription(self) -> 'Subscription':\n", - " return subscriptions[self.subscription_handle]\n", - "\n", - " @property\n", - " def callback_object(self) -> 'CallbackObject':\n", - " return callback_objects[self.id]\n", - "\n", - " def __hash__(self):\n", - " return hash((self.id, self.timestamp, self.subscription_handle))\n", - "\n", - "@dataclass\n", - "class CallbackObject:\n", - " id: int # (reference) = subscription_object.id | timer.id | ....\n", - " timestamp: int\n", - " callback_object: int\n", - "\n", - " @cached_property\n", - " def callback_instances(self) -> List['CallbackInstance']:\n", - " return list(filter(lambda inst: inst.callback_object == self.callback_object, callback_instances))\n", - "\n", - " @cached_property\n", - " def owner(self):\n", - " if self.id in timers:\n", - " return timers[self.id]\n", - " if self.id in publishers:\n", - " return publishers[self.id]\n", - " if self.id in subscription_objects:\n", - " return subscription_objects[self.id]\n", - " if self.id in handler.data.services.index:\n", - " return 'Service'\n", - " if self.id in handler.data.clients.index:\n", - " return 'Client'\n", - " return None\n", - "\n", - " @cached_property\n", - " def owner_info(self):\n", - " info = util.get_callback_owner_info(self.callback_object)\n", - " if info is None:\n", - " return None, None\n", - " \n", - " type_name, dict_str = info.split(\" -- \")\n", - " kv_strs = dict_str.split(\", \")\n", - " info_dict = {k: v for k, v in map(lambda kv_str: kv_str.split(\": \", maxsplit=1), kv_strs)}\n", - " return type_name, info_dict\n", - "\n", - " def __hash__(self):\n", - " return hash((self.id, self.timestamp, self.callback_object))\n", - "\n", - "@dataclass\n", - "class PublishInstance:\n", - " publisher_handle: int\n", - " timestamp: int\n", - " message: int\n", - "\n", - " @property\n", - " def publisher(self) -> 'Publisher':\n", - " return publishers[self.publisher_handle]\n", - "\n", - " def __hash__(self):\n", - " return hash((self.publisher_handle, self.timestamp, self.message))\n", - "\n", - "@dataclass\n", - "class CallbackInstance:\n", - " callback_object: int\n", - " timestamp: pd.Timestamp\n", - " duration: pd.Timedelta\n", - " intra_process: bool\n", - "\n", - " @property\n", - " def callback_obj(self) -> 'CallbackObject':\n", - " return callback_objects[self.callback_object]\n", - "\n", - " def __hash__(self):\n", - " return hash((self.callback_object, self.timestamp, self.duration))\n", - "\n", - "@dataclass\n", - "class CallbackSymbol:\n", - " id: int # callback_object\n", - " timestamp: int\n", - " symbol: str\n", - "\n", - " @cached_property\n", - " def callback_objs(self) -> List['CallbackObject']:\n", - " return list(filter(lambda cb_obj: cb_obj.callback_object == self.id, callback_objects.values()))\n", - "\n", - " def __hash__(self):\n", - " return hash((self.id, self.timestamp, self.symbol))\n", - "\n", - "\n", - "#######################################\n", - "# Self-defined (not from ROS2DataModel)\n", - "#######################################\n", - "\n", - "@dataclass\n", - "class Topic:\n", - " name: str\n", - "\n", - " @cached_property\n", - " def publishers(self) -> List['Publisher']:\n", - " return list(filter(lambda pub: pub.topic_name == self.name, publishers.values()))\n", - " \n", - " @cached_property\n", - " def subscriptions(self) -> List['Subscription']:\n", - " return list(filter(lambda sub: sub.topic_name == self.name, subscriptions.values()))\n", - "\n", - " def __hash__(self):\n", - " return hash(self.name)\n" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "metadata": { - "pycharm": { - "name": "#%%\n" + "name": "#%%\n", + "is_executing": true } }, "outputs": [ @@ -410,67 +74,38 @@ "name": "stdout", "output_type": "stream", "text": [ - "Found pickled ROS objects from previous session, restoring...\n", - "Done.\n" + "[CACHE] Creating cache entry for tr_objects (in cache/tr_objects_c1e0d50b8d.pkl).\n", + "found converted file: /home/max/Downloads/autoware-trace/ust/converted\n", + " [100%] [Ros2Handler]\r\n", + "[TrContext] Processing ROS 2 objects from traces...\n", + " ├─ Processed 216 nodes\n", + " ├─ Processed 668 publishers\n", + " ├─ Processed 748 subscriptions\n", + " ├─ Processed 154 timers\n", + " ├─ Processed 69 timer-node links\n", + " ├─ Processed 824 subscription objects\n", + " ├─ Processed 1842 callback objects\n", + " ├─ Processed 1842 callback symbols\n", + " ├─ Processed 70808 publish instances\n" ] } ], "source": [ - "if not pkl_exists:\n", - " print(\"Did not find pickled ROS objects, extracting...\")\n", - " #######################################\n", - " # Instantiate collections\n", - " #######################################\n", + "def _load_traces():\n", + " file = load_file(TR_PATH)\n", + " handler = Ros2Handler.process(file)\n", + " util = Ros2DataModelUtil(handler)\n", + " return TrContext(util, handler)\n", "\n", - " nodes: Dict[int, 'Node'] = list_to_dict(df_to_type_list(handler.data.nodes, 'Node')); print(f\"Processed {len(nodes):<8d} nodes\")\n", - " publishers: Dict[int, 'Publisher'] = list_to_dict(df_to_type_list(handler.data.rcl_publishers, 'Publisher')); print(f\"Processed {len(publishers):<8d} publishers\")\n", - " subscriptions: Dict[int, 'Subscription'] = list_to_dict(df_to_type_list(handler.data.rcl_subscriptions, 'Subscription')); print(f\"Processed {len(subscriptions):<8d} subscriptions\")\n", - " timers: Dict[int, 'Timer'] = list_to_dict(df_to_type_list(handler.data.timers, 'Timer')); print(f\"Processed {len(timers):<8d} timers\")\n", - " timer_node_links: Dict[int, 'TimerNodeLink'] = list_to_dict(df_to_type_list(handler.data.timer_node_links, 'TimerNodeLink')); print(f\"Processed {len(timer_node_links):<8d} timer-node links\")\n", - " subscription_objects: Dict[int, 'SubscriptionObject'] = list_to_dict(df_to_type_list(handler.data.subscription_objects, 'SubscriptionObject')); print(f\"Processed {len(subscription_objects):<8d} subscription objects\")\n", - " callback_objects: Dict[int, 'CallbackObject'] = list_to_dict(df_to_type_list(handler.data.callback_objects, 'CallbackObject')); print(f\"Processed {len(callback_objects):<8d} callback objects\")\n", - " callback_symbols: Dict[int, 'CallbackSymbol'] = list_to_dict(df_to_type_list(handler.data.callback_symbols, 'CallbackSymbol')); print(f\"Processed {len(callback_symbols):<8d} callback symbols\")\n", - " publish_instances: List['PublishInstance'] = df_to_type_list(handler.data.rcl_publish_instances, 'PublishInstance'); print(f\"Processed {len(publish_instances):<8d} publish instances\")\n", - " callback_instances: List['CallbackInstance'] = df_to_type_list(handler.data.callback_instances, 'CallbackInstance'); print(f\"Processed {len(callback_instances):<8d} callback instances\")\n", + "_tracing_context = cached(\"tr_objects\", _load_traces, [TR_PATH])\n", + "_tr_globals = [\"nodes\", \"publishers\", \"subscriptions\", \"timers\", \"timer_node_links\", \"subscription_objects\",\n", + " \"callback_objects\", \"callback_symbols\", \"publish_instances\", \"callback_instances\", \"topics\"]\n", "\n", - " _unique_topic_names = {*(pub.topic_name for pub in publishers.values()), *(sub.topic_name for sub in subscriptions.values())}\n", - " topics: Dict[str, 'Topic'] = list_to_dict(map(Topic, _unique_topic_names), key=\"name\"); print(f\"Processed {len(topics):<8d} topics\")\n", + "# Help the IDE recognize those identifiers\n", + "nodes = publishers = subscriptions = timers = timer_node_links = subscription_objects = callback_objects = callback_symbols = publish_instances = callback_instances = topics = None\n", "\n", - " print(\"Caching dynamic properties...\")\n", - "\n", - " [(o.path, o.publishers, o.subscriptions, o.timers) for o in nodes.values()] ; print(\"Cached node properties\")\n", - " [(o.instances, o.subscriptions) for o in publishers.values()] ; print(\"Cached publisher properties\")\n", - " [(o.publishers, o.subscription_objects) for o in subscriptions.values()] ; print(\"Cached subscription properties\")\n", - " [(o.nodes) for o in timers.values()] ; print(\"Cached timer properties\")\n", - " [(o.callback_instances, o.owner, o.owner_info) for o in callback_objects.values()] ; print(\"Cached callback object properties\")\n", - " [(o.callback_objs) for o in callback_symbols.values()] ; print(\"Cached callback symbol properties\")\n", - " [(o.publishers, o.subscriptions) for o in topics.values()] ; print(\"Cached topic properties\")\n", - "\n", - " fields_to_pickle = [\n", - " \"nodes\",\n", - " \"publishers\",\n", - " \"subscriptions\",\n", - " \"timers\",\n", - " \"timer_node_links\",\n", - " \"subscription_objects\",\n", - " \"callback_objects\",\n", - " \"callback_symbols\",\n", - " \"publish_instances\",\n", - " \"callback_instances\",\n", - " \"topics\"\n", - " ]\n", - "\n", - " pkl_dict = {key: globals()[key] for key in fields_to_pickle}\n", - "\n", - " print(\"Pickling...\")\n", - " with open(pkl_filename, \"wb\") as f:\n", - " pickle.dump(pkl_dict, f)\n", - "else:\n", - " print(\"Found pickled ROS objects from previous session, restoring...\")\n", - " with open(pkl_filename, \"rb\") as f:\n", - " pkl_dict = pickle.load(f)\n", - " for k, v in pkl_dict.items():\n", - " globals()[k] = v\n", + "for name in _tr_globals:\n", + " globals()[name] = getattr(_tracing_context, name)\n", "\n", "print(\"Done.\")\n" ] @@ -502,7 +137,7 @@ "\n", "for sym in callback_symbols.values():\n", " try:\n", - " cbo = list(filter(lambda val: val.callback_object==sym.id, callback_objects.values()))\n", + " cbo = list(filter(lambda val: val.callback_object == sym.id, callback_objects.values()))\n", " assert len(cbo) == 1\n", " cbo = cbo[0]\n", " except:\n", @@ -516,20 +151,22 @@ "\n", "sym_table.sort(key=lambda tup: tup[1])\n", "\n", + "\n", "def trim(string, length):\n", " if len(string) > length:\n", - " return f\"{string[:length-3]}...\"\n", + " return f\"{string[:length - 3]}...\"\n", " return string\n", "\n", + "\n", "for sym, type, info in sym_table:\n", - " sym: CallbackSymbol\n", + " sym: TrCallbackSymbol\n", " pretty_sym = Ros2DataModelUtil._prettify(None, sym.symbol)\n", " pretty_sym = re.sub(r\"std::shared_ptr<(.*?) *(const)?>\", r\"\\1*\", pretty_sym)\n", " try:\n", " i = len(sym.callback_obj.callback_instances)\n", " except KeyError:\n", " i = -1\n", - " print(f\"{trim(pretty_sym, 100):100s}: i={i:>4d} {type:12s} n={info['node']:40s}\", end=' ') \n", + " print(f\"{trim(pretty_sym, 100):100s}: i={i:>4d} {type:12s} n={info['node']:40s}\", end=' ')\n", " if type == 'Timer':\n", " print(f\"p={info['period']:7s}\")\n", " elif type == 'Subscription':\n", @@ -562,7 +199,8 @@ "# Aggregate topics that have the same pubs and subs\n", "topic_cohorts = {}\n", "for topic in topics.values():\n", - " key = (frozenset({*(pub.node_handle for pub in topic.publishers)}), frozenset({*(sub.node_handle for sub in topic.subscriptions)}))\n", + " key = (frozenset({*(pub.node_handle for pub in topic.publishers)}),\n", + " frozenset({*(sub.node_handle for sub in topic.subscriptions)}))\n", " if key not in topic_cohorts:\n", " topic_cohorts[key] = []\n", " topic_cohorts[key].append(topic)\n", @@ -593,7 +231,7 @@ "source": [ "unknowns = {}\n", "\n", - "print_node_timer = lambda node_path, period: print(f\"{node_path:<90s}: {1/(period*1e-9):8.2f}Hz\")\n", + "print_node_timer = lambda node_path, period: print(f\"{node_path:<90s}: {1 / (period * 1e-9):8.2f}Hz\")\n", "\n", "for timer in timers.values():\n", " timer_nodes = timer.nodes\n", @@ -603,7 +241,7 @@ " unknowns[timer.period] += 1\n", "\n", " for node in timer_nodes: print_node_timer(node.path, timer.period)\n", - " \n", + "\n", "for period, count in unknowns.items():\n", " print_node_timer(f\"UNKNOWN (x{count})\", period)\n", "\n", @@ -644,7 +282,7 @@ " continue\n", " if pub.id not in pub_stats:\n", " pub_stats[pub.id] = {'times': []}\n", - " pub_stats[pub.id]['times'].append(pi.timestamp*1e-9) # Nanoseconds to seconds float\n", + " pub_stats[pub.id]['times'].append(pi.timestamp * 1e-9) # Nanoseconds to seconds float\n", "\n", "print(f\"{unknown} unknown publisher handles ({len(pub_stats)} known ones)\")" ] @@ -678,7 +316,7 @@ " v['period'] = pub_time_diff.mean()\n", " v['period_std'] = pub_time_diff.std()\n", " v['frequency'] = 1 / v['period']\n", - " v['frequency_std'] = (1/pub_time_diff).std()\n", + " v['frequency_std'] = (1 / pub_time_diff).std()\n", "\n", " try:\n", " publisher = publishers[k]\n", @@ -686,16 +324,16 @@ " topic_name = publisher.topic_name\n", " node_path = publisher_node.path\n", " except Exception:\n", - " topic_name=\"UNKNOWN\"\n", - " node_path=\"UNKNOWN\"\n", - " \n", - " fig = plt.figure(figsize=(15,5))\n", + " topic_name = \"UNKNOWN\"\n", + " node_path = \"UNKNOWN\"\n", + "\n", + " fig = plt.figure(figsize=(15, 5))\n", " ax = fig.add_subplot()\n", - " ax.hist(1/pub_time_diff)\n", + " ax.hist(1 / pub_time_diff)\n", " ax.set_xlabel(\"Publication Frequency [Hz]\")\n", " ax.set_ylabel(\"#Publications\")\n", " ax.set_title(f\"{node_path} =({v['frequency']:.2f}Hz)=> {topic_name}\")\n", - " plt.savefig('/'.join((fig_dirname, f\"{i:06}{node_path}__{topic_name}\".replace('/','-'))))\n" + " plt.savefig('/'.join((fig_dirname, f\"{i:06}{node_path}__{topic_name}\".replace('/', '-'))))\n" ] }, { @@ -724,30 +362,30 @@ "topic_filters = [\"/rosout\", \"/parameter_events\", \"/diagnostics\"]\n", "\n", "from pyvis.network import Network\n", + "\n", "net = Network(notebook=True, height='750px', width='100%', bgcolor='#ffffff', font_color='#000000')\n", "\n", "net.add_node(\"INPUT\", label=\"Input\", size=100, color=\"green\", physics=False, x=0, y=0)\n", "net.add_node(\"OUTPUT\", label=\"Output\", size=100, color=\"red\", physics=False, x=6000, y=0)\n", "\n", - "\n", "for node in nodes.values():\n", - " if any(f in node.path for f in node_filters): \n", + " if any(f in node.path for f in node_filters):\n", " continue\n", " net.add_node(node.id, label=node.name, title=node.path, size=20, color=\"#333\")\n", "\n", "for cohort_key, cohort_topics in topic_cohorts.items():\n", " cohort_topic_names = [topic.name for topic in cohort_topics if not any(f in topic.name for f in topic_filters)]\n", - " if not cohort_topic_names: \n", + " if not cohort_topic_names:\n", " continue\n", - " cohort_id=\"\\n\".join(cohort_topic_names)\n", - " cohort_weight=len(cohort_topic_names)\n", + " cohort_id = \"\\n\".join(cohort_topic_names)\n", + " cohort_weight = len(cohort_topic_names)\n", " net.add_node(cohort_id, label=\" \", title=cohort_id, size=5, color=\"#333\")\n", - " \n", + "\n", " pubs = cohort_key[0]\n", " subs = cohort_key[1]\n", " n_pubs = len(pubs)\n", " n_subs = len(subs)\n", - " \n", + "\n", " try:\n", " if not n_pubs:\n", " net.add_edge(\"INPUT\", cohort_id, arrows=\"to\", color=\"green\", weight=cohort_weight)\n", @@ -791,25 +429,28 @@ "def filter_none(ls):\n", " return filter(lambda x: x is not None, ls)\n", "\n", + "\n", "def safe_map(func, ls):\n", " def safe_func(arg):\n", " try:\n", " return func(arg)\n", " except:\n", " return None\n", - " \n", + "\n", " return map(safe_func, ls)\n", "\n", + "\n", "pub_use_delays = {node.id: {\n", - " 'pubs': {}, \n", - " 'invocations': {}, \n", - " 'n_unknown_invocations': 0, \n", - " 'n_pub_timestamps': 0\n", - " } for node in nodes.values()}\n", + " 'pubs': {},\n", + " 'invocations': {},\n", + " 'n_unknown_invocations': 0,\n", + " 'n_pub_timestamps': 0\n", + "} for node in nodes.values()}\n", "\n", "for node in nodes.values():\n", " node_pub_use_dict = pub_use_delays[node.id]\n", - " timestamp_min = np.inf; timestamp_max = 0\n", + " timestamp_min = np.inf;\n", + " timestamp_max = 0\n", "\n", " n_pub_timestamps = 0\n", " for sub in node.subscriptions:\n", @@ -818,10 +459,12 @@ " pub_timestamps = [inst.timestamp for inst in pub.instances]\n", "\n", " try:\n", - " pub_t_min = min(pub_timestamps); pub_t_max = max(pub_timestamps)\n", + " pub_t_min = min(pub_timestamps);\n", + " pub_t_max = max(pub_timestamps)\n", " except ValueError:\n", - " pub_t_min = np.inf; pub_t_max = 0\n", - " \n", + " pub_t_min = np.inf;\n", + " pub_t_max = 0\n", + "\n", " if pub_t_min < timestamp_min: timestamp_min = pub_t_min\n", " if pub_t_max > timestamp_max: timestamp_max = pub_t_max\n", "\n", @@ -829,9 +472,11 @@ " node_pub_use_dict['n_pub_timestamps'] += len(pub_timestamps)\n", "\n", " timer_cb_objs = list(filter_none(safe_map(lambda timer: timer.callback_object, node.timers)))\n", - " subsc_cb_objs = list(filter_none(safe_map(lambda subsc: subsc.subscription_object.callback_object, node.subscriptions)))\n", + " subsc_cb_objs = list(\n", + " filter_none(safe_map(lambda subsc: subsc.subscription_object.callback_object, node.subscriptions)))\n", "\n", - " print(f\"{node.path:95s} has {len(timer_cb_objs):1d} timer callbacks, {len(subsc_cb_objs):2d} subscription callbacks, {len(node_pub_use_dict['pubs']):2d} subscribed topics.\")\n", + " print(\n", + " f\"{node.path:95s} has {len(timer_cb_objs):1d} timer callbacks, {len(subsc_cb_objs):2d} subscription callbacks, {len(node_pub_use_dict['pubs']):2d} subscribed topics.\")\n", "\n", " node_invocations = node_pub_use_dict['invocations']\n", "\n", @@ -859,7 +504,7 @@ "os.makedirs(fig_dirname, exist_ok=True)\n", "plt.close('all')\n", "\n", - "node_filters=[]#\"transform_listener_impl\",]\n", + "node_filters = [] #\"transform_listener_impl\",]\n", "\n", "nodes_filtered = [node for node in nodes.values() if not any(f in node.path for f in node_filters)]\n", "print(f\"Ignoring {len(nodes.values()) - len(nodes_filtered)} nodes due to filters.\")\n", @@ -868,7 +513,8 @@ "\n", "zero_color = cm.get_cmap('viridis')(0.0)\n", "\n", - "for node_i, (node, node_path, node_pub_use_dict) in enumerate(map(lambda node: (node, node.path, pub_use_delays[node.id]), nodes_filtered)):\n", + "for node_i, (node, node_path, node_pub_use_dict) in enumerate(\n", + " map(lambda node: (node, node.path, pub_use_delays[node.id]), nodes_filtered)):\n", "\n", " if not node_pub_use_dict['invocations']:\n", " print(f\"{node_path:95s} has no invocations, skipping.\")\n", @@ -878,13 +524,14 @@ " print(f\"Skipping {node_path}, no publications\")\n", " continue\n", "\n", - " fig = plt.figure(figsize=(15,5))\n", + " fig = plt.figure(figsize=(15, 5))\n", " ax: plt.Axes = fig.add_subplot()\n", "\n", " max_pubs_per_topic = max(len(pubs) for pubs in node_pub_use_dict['pubs'].values())\n", " topic_names, topic_pubs = (zip(*node_pub_use_dict['pubs'].items()))\n", "\n", - " vmin = 0; vmax = max_pubs_per_topic\n", + " vmin = 0;\n", + " vmax = max_pubs_per_topic\n", "\n", " y_labels = []\n", " current_y = 0\n", @@ -897,24 +544,26 @@ " sym = re.sub(r\"std::shared_ptr<(.*?)>\", r\"\\1*\", sym)\n", "\n", " cb_owner = cb_obj.owner\n", - " if isinstance(cb_owner, Timer):\n", + " if isinstance(cb_owner, TrTimer):\n", " cb_type = \"T\"\n", - " elif isinstance(cb_owner, SubscriptionObject):\n", + " elif isinstance(cb_owner, TrSubscriptionObject):\n", " cb_type = \"S\"\n", " except KeyError or AttributeError:\n", " sym = \"UNKNOWN\"\n", " cb_type = \"U\"\n", - " \n", + "\n", " y_labels.append(f\"{sym} {cb_type}\")\n", " n_markers = len(cb_invocations)\n", "\n", - " points_x = []; points_y = []\n", + " points_x = [];\n", + " points_y = []\n", " for time, dur in cb_invocations:\n", - " time = time.timestamp() - common_offset; dur = dur.total_seconds()\n", - " points_x += [time, time+dur, None]\n", + " time = time.timestamp() - common_offset;\n", + " dur = dur.total_seconds()\n", + " points_x += [time, time + dur, None]\n", " points_y += [current_y, current_y, 0.0]\n", - " \n", - " ax.plot(points_x,points_y, marker='.', c=zero_color)\n", + "\n", + " ax.plot(points_x, points_y, marker='.', c=zero_color)\n", " current_y += 1\n", "\n", " n_cbs = current_y\n", @@ -922,17 +571,19 @@ " for topic_i, (topic_name, pubs) in enumerate(zip(topic_names, topic_pubs)):\n", " for pub_i, (pub_name, timestamps) in enumerate(pubs.items()):\n", " n_markers = len(timestamps)\n", - " ax.scatter(np.array(timestamps)*1e-9 - common_offset, (current_y,) * n_markers, marker='.', c=(pub_i,) * n_markers, vmin=vmin, vmax=vmax)\n", - " \n", + " ax.scatter(np.array(timestamps) * 1e-9 - common_offset, (current_y,) * n_markers, marker='.',\n", + " c=(pub_i,) * n_markers, vmin=vmin, vmax=vmax)\n", + "\n", " y_labels.append(topic_name)\n", " current_y += 1\n", - " \n", + "\n", " trigger_strs = []\n", " t = node.timers\n", " if t:\n", " n_timers = len(t)\n", - " freqs = map(lambda timer: 1 / (timer.period*1e-9), t)\n", - " trigger_strs.append(f\"{n_timers} timer{'s' if n_timers != 1 else ''}, {'Hz, '.join((f'{freq:.0f}' for freq in freqs))}Hz\")\n", + " freqs = map(lambda timer: 1 / (timer.period * 1e-9), t)\n", + " trigger_strs.append(\n", + " f\"{n_timers} timer{'s' if n_timers != 1 else ''}, {'Hz, '.join((f'{freq:.0f}' for freq in freqs))}Hz\")\n", " if node.subscriptions:\n", " n_subs = len(node.subscriptions)\n", " trigger_strs.append(f\"{n_subs} subscription{'s' if n_subs != 1 else ''}\")\n", @@ -946,9 +597,80 @@ " ax.set_xlim(50, 50.25)\n", "\n", " ax.hlines(n_cbs - 0.5, *ax.get_xlim(), linestyles='dashed')\n", - " plt.savefig(os.path.join(fig_dirname, f\"{node_i:06}{node_path}\".replace('/','-')))" + " plt.savefig(os.path.join(fig_dirname, f\"{node_i:06}{node_path}\".replace('/', '-')))" ] }, + { + "cell_type": "markdown", + "source": [ + "# ROS2 Tracing & Clang Matching" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "def _load_cl_objects():\n", + " return process_clang_output(CL_PATH)\n", + "\n", + "_cl_context: ClContext = cached(\"cl_objects\", _load_cl_objects, [CL_PATH])" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "#################################################\n", + "# Match nodes\n", + "#################################################\n", + "\n", + "node_map = {}\n", + "for node in nodes:\n", + " node: TrNode\n", + " for tu_path, tu in _cl_context.translation_units.items():\n", + " for cl_node in tu.nodes.values():\n", + " if node.name == cl_node.ros_name:\n", + " if node in node_map:\n", + " print(f\"[WARN ] Node already mapped: {node} -> {node_map[node]}\")\n", + " node_map[node] = cl_node\n", + "\n", + "#################################################\n", + "# Match subscriptions\n", + "#################################################\n", + "\n", + "\n", + "\n", + "#################################################\n", + "# Match publishers\n", + "#################################################\n", + "\n", + "\n", + "\n", + "#################################################\n", + "# Match timers\n", + "#################################################" + ], + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + } + }, { "cell_type": "markdown", "metadata": { @@ -984,14 +706,16 @@ "\n", "LatencyStats = pd.Series\n", "\n", + "\n", "@dataclass\n", "class LatencyGraph:\n", - " verts: Set[CallbackObject]\n", - " edges: Dict[Tuple[CallbackObject, CallbackObject], Tuple[Topic, LatencyStats]]\n", - " starts: Dict[CallbackObject, Topic]\n", - " ends: Dict[CallbackObject, Topic]\n", + " verts: Set[TrCallbackObject]\n", + " edges: Dict[Tuple[TrCallbackObject, TrCallbackObject], Tuple[TrTopic, LatencyStats]]\n", + " starts: Dict[TrCallbackObject, TrTopic]\n", + " ends: Dict[TrCallbackObject, TrTopic]\n", "\n", - "def pub_use_latencies(cb_instances: List[CallbackInstance], pub_instances: List[PublishInstance]):\n", + "\n", + "def pub_use_latencies(cb_instances: List[TrCallbackInstance], pub_instances: List[TrPublishInstance]):\n", " cb_times = sorted([inst.timestamp.timestamp() for inst in cb_instances])\n", "\n", " if not pub_instances:\n", @@ -1004,17 +728,19 @@ " ret_series = pd.Series(pub_use_lats, index=cb_times)\n", " return ret_series\n", "\n", + "\n", "def inst_runtime_interval(cb_inst):\n", " inst_t_min = cb_inst.timestamp.timestamp()\n", " inst_t_max = inst_t_min + cb_inst.duration.total_seconds()\n", " return (inst_t_min, inst_t_max)\n", "\n", - "def count_pub_insts_in_intervals(cb_intervals: List[Tuple[float, float]], pub_insts: List[PublishInstance]):\n", + "\n", + "def count_pub_insts_in_intervals(cb_intervals: List[Tuple[float, float]], pub_insts: List[TrPublishInstance]):\n", " \"\"\"\n", " Counts number of publication instancess that lie within one of the cb_intervals.\n", " \"\"\"\n", " pub_timestamps = [inst.timestamp * 1e-9 for inst in pub_insts]\n", - " \n", + "\n", " # Algorithm: Two-pointer method\n", " # With both the pub_timestamps and cb_intervals sorted ascending,\n", " # we can cut down the O(m*n) comparisons to O(m+n).\n", @@ -1038,6 +764,7 @@ "\n", " return n_overlaps\n", "\n", + "\n", "#################################################\n", "# Identify input and output topics\n", "#################################################\n", @@ -1050,9 +777,9 @@ "# publications of each callback\n", "#################################################\n", "\n", - "cb_to_scored_pub: Dict[CallbackObject, Set[Tuple[Publisher, float]]] = {}\n", - "topic_to_dep_cb: Dict[Topic, Set[CallbackObject]] = {}\n", - "pub_cb_to_lat_stats: Dict[Tuple[Publisher, CallbackObject], LatencyStats] = {}\n", + "cb_to_scored_pub: Dict[TrCallbackObject, Set[Tuple[TrPublisher, float]]] = {}\n", + "topic_to_dep_cb: Dict[TrTopic, Set[TrCallbackObject]] = {}\n", + "pub_cb_to_lat_stats: Dict[Tuple[TrPublisher, TrCallbackObject], LatencyStats] = {}\n", "\n", "with ProgressPrinter(\"Processing\", len(callback_objects)) as p:\n", " for cb in callback_objects.values():\n", @@ -1069,21 +796,22 @@ " # - Timer callbacks: no EXPLICIT dependencies\n", " # - Subscription callbacks: callback depends on the subscribed topic. Possibly also has other IMPLICIT dependencies\n", "\n", - " if type(cb.owner).__name__ == SubscriptionObject.__name__:\n", + " if type(cb.owner).__name__ == TrSubscriptionObject.__name__:\n", " owner_node = cb.owner.subscription.node\n", - " dep_topics = [cb.owner.subscription.topic,]\n", - " elif type(cb.owner).__name__ == Timer.__name__:\n", + " dep_topics = [cb.owner.subscription.topic, ]\n", + " elif type(cb.owner).__name__ == TrTimer.__name__:\n", " owner_nodes = cb.owner.nodes\n", " if len(owner_nodes) != 1:\n", - " raise(ValueError(\"Timer has more than one owner!\"))\n", + " raise (ValueError(\"Timer has more than one owner!\"))\n", " dep_topics = []\n", " elif cb.owner is None:\n", " dep_topics = []\n", " continue\n", " else:\n", - " raise RuntimeError(f\"Callback owners other than timers/subscriptions cannot be handled: {cb.owner} {cb.owner_info}\")\n", + " raise RuntimeError(\n", + " f\"Callback owners other than timers/subscriptions cannot be handled: {cb.owner} {cb.owner_info}\")\n", "\n", - " for topic in dep_topics: \n", + " for topic in dep_topics:\n", " if topic not in topic_to_dep_cb:\n", " topic_to_dep_cb[topic] = set()\n", " topic_to_dep_cb[topic].add(cb)\n", @@ -1096,14 +824,15 @@ " # assume that they are published by the callback\n", "\n", " cb_runtime_intervals = [inst_runtime_interval(inst) for inst in cb.callback_instances]\n", - " cb_pub_overlap_counts = [count_pub_insts_in_intervals(cb_runtime_intervals, pub.instances) for pub in owner_node.publishers]\n", + " cb_pub_overlap_counts = [count_pub_insts_in_intervals(cb_runtime_intervals, pub.instances) for pub in\n", + " owner_node.publishers]\n", "\n", " for pub, olap_count in zip(owner_node.publishers, cb_pub_overlap_counts):\n", " if olap_count == 0 or not pub.instances:\n", " continue\n", " score = olap_count / len(pub.instances)\n", "\n", - " if cb not in cb_to_scored_pub: \n", + " if cb not in cb_to_scored_pub:\n", " cb_to_scored_pub[cb] = set()\n", " cb_to_scored_pub[cb].add((pub, score))\n" ] @@ -1191,29 +920,36 @@ " cb, score = tup\n", " pretty_sym = Ros2DataModelUtil._prettify(None, callback_symbols[cb.callback_object].symbol)\n", " pretty_sym = re.sub(r\"std::shared_ptr<(.*?) *(const)?>\", r\"\\1*\", pretty_sym)\n", - " return f'{score*100:>10.6f}% {pretty_sym}'\n", + " return f'{score * 100:>10.6f}% {pretty_sym}'\n", + "\n", + "\n", " cbstr = ',\\n '.join(map(_mapfun, scored_cbs))\n", " print(f\"{pub.topic_name}:\\n {cbstr}\")\n", "\n", "inputs = {}\n", "outputs = {}\n", "for topic in out_topics:\n", - " outputs.update({cb: topic for pub in topic.publishers if pub in pub_to_scored_cb for cb, score in pub_to_scored_cb[pub]})\n", + " outputs.update(\n", + " {cb: topic for pub in topic.publishers if pub in pub_to_scored_cb for cb, score in pub_to_scored_cb[pub]})\n", "for topic in in_topics:\n", - " inputs.update({sub_obj.callback_object: topic for sub in topic.subscriptions for sub_obj in sub.subscription_objects})\n", - "\n", + " inputs.update(\n", + " {sub_obj.callback_object: topic for sub in topic.subscriptions for sub_obj in sub.subscription_objects})\n", "\n", "#################################################\n", "# Filter callback objects and topics\n", "#################################################\n", "\n", "callback_symbol_filters = [\n", - " \"rcl_interfaces::msg::ParameterEvent\", \"diagnostic_updater::Updater\", \n", - " \"rclcpp::ParameterService::ParameterService\", \"tf2_ros::TransformListener\",\n", - " \"rclcpp_components::ComponentManager\", \"diagnostic_aggregator::Aggregator\"\n", - " ]\n", + " \"rcl_interfaces::msg::ParameterEvent\", \"diagnostic_updater::Updater\",\n", + " \"rclcpp::ParameterService::ParameterService\", \"tf2_ros::TransformListener\",\n", + " \"rclcpp_components::ComponentManager\", \"diagnostic_aggregator::Aggregator\"\n", + "]\n", "\n", - "verts = set(filter(lambda vert: not any(f in callback_symbols[vert.callback_object].symbol for f in callback_symbol_filters), verts))\n", + "verts = set(\n", + " filter(\n", + " lambda vert: not any(\n", + " f in callback_symbols[vert.callback_object].symbol for f in callback_symbol_filters),\n", + " verts))\n", "edges = {(cb1, cb2): val for (cb1, cb2), val in edges.items() if cb1 in verts and cb2 in verts}\n", "outputs = {cb: topic for cb, topic in outputs.items() if cb in verts}\n", "inputs = {cb: topic for cb, topic in inputs.items() if cb in verts}\n", @@ -1252,6 +988,7 @@ "def _find_node(path):\n", " return next(filter(lambda n: n.path == path, nodes.values()))\n", "\n", + "\n", "from ruamel.yaml import YAML\n", "\n", "yaml = YAML()\n", @@ -1260,9 +997,9 @@ " # Convert node path to node instance\n", " node_internal_deps = {_find_node(path): {\n", " callback_objects[cb_id]: [callback_objects[dep_id] for dep_id in dep_ids]\n", - " for cb_id, dep_ids \n", + " for cb_id, dep_ids\n", " in deps.items()\n", - " } for path, deps in node_internal_deps.items()}\n", + " } for path, deps in node_internal_deps.items()}\n", "\n", "for node, cb_mappings in node_internal_deps.items():\n", " print(node)\n", @@ -1287,7 +1024,9 @@ "\n", "import graphviz as gv\n", "\n", - "g = gv.Digraph('G', filename=\"latency_graph.gv\", node_attr={'shape': 'record', 'margin': '0.00001', 'width': '0.00001', 'height': '0.001'}, graph_attr={'pack': '1'})\n", + "g = gv.Digraph('G', filename=\"latency_graph.gv\",\n", + " node_attr={'shape': 'record', 'margin': '0.00001', 'width': '0.00001', 'height': '0.001'},\n", + " graph_attr={'pack': '1'})\n", "g.graph_attr['rankdir'] = 'LR'\n", "\n", "g.node(\"INPUT\", gv.nohtml(\"{INPUT |}\"))\n", @@ -1297,11 +1036,11 @@ "export_dict = {}\n", "\n", "for vert in latency_graph.verts:\n", - " vert: CallbackObject\n", + " vert: TrCallbackObject\n", "\n", - " if isinstance(vert.owner, Timer):\n", + " if isinstance(vert.owner, TrTimer):\n", " owner_nodes = vert.owner.nodes\n", - " elif isinstance(vert.owner, SubscriptionObject):\n", + " elif isinstance(vert.owner, TrSubscriptionObject):\n", " owner_nodes = [vert.owner.subscription.node]\n", " else:\n", " owner_nodes = []\n", @@ -1324,26 +1063,29 @@ " c.attr(bgcolor='lightgray')\n", "\n", " for cb in cbs:\n", - " cb: CallbackObject\n", + " cb: TrCallbackObject\n", " pretty_sym = Ros2DataModelUtil._prettify(None, callback_symbols[cb.callback_object].symbol)\n", " pretty_sym = re.sub(r\"std::shared_ptr<(.*?) *(const)?>\", r\"\\1*\", pretty_sym)\n", " pretty_sym = pretty_sym.replace('{', '\\\\{').replace('}', '\\\\}')\n", "\n", " export_dict[node.path].append({cb.id: {\n", - " 'symbol': pretty_sym, \n", - " 'ins': sum(map(lambda k: k[1].id == cb.id, latency_graph.edges.keys())) + sum(map(lambda k: k.id == cb.id, latency_graph.starts.keys())), \n", - " 'outs': sum(map(lambda k: k[0].id == cb.id, latency_graph.edges.keys())) + sum(map(lambda k: k.id == cb.id, latency_graph.ends.keys()))\n", - " }})\n", + " 'symbol': pretty_sym,\n", + " 'ins': sum(map(lambda k: k[1].id == cb.id, latency_graph.edges.keys())) + sum(\n", + " map(lambda k: k.id == cb.id, latency_graph.starts.keys())),\n", + " 'outs': sum(map(lambda k: k[0].id == cb.id, latency_graph.edges.keys())) + sum(\n", + " map(lambda k: k.id == cb.id, latency_graph.ends.keys()))\n", + " }})\n", "\n", " cb_durations = np.array(list(map(lambda inst: inst.duration.total_seconds(), cb.callback_instances)))\n", " if len(cb_durations) == 0:\n", " cb_durations = np.zeros(1)\n", " cb_dur_stats = (cb_durations.min(), cb_durations.mean(), cb_durations.max())\n", - " c.node(str(cb.id), gv.nohtml(f\"{{ |{pretty_sym} |}}\"), tooltip=f\"{cb_dur_stats[0]*1e6:.0f}µs, {cb_dur_stats[1]*1e6:.0f}µs, {cb_dur_stats[2]*1e6:.0f}µs\", fontcolor=('red' if isinstance(cb.owner, Timer) else 'black'))\n", - "\n", + " c.node(str(cb.id), gv.nohtml(f\"{{ |{pretty_sym} |}}\"),\n", + " tooltip=f\"{cb_dur_stats[0] * 1e6:.0f}µs, {cb_dur_stats[1] * 1e6:.0f}µs, {cb_dur_stats[2] * 1e6:.0f}µs\",\n", + " fontcolor=('red' if isinstance(cb.owner, TrTimer) else 'black'))\n", "\n", "for (c1, c2), (topic, lat_stats) in latency_graph.edges.items():\n", - " g.edge(f\"{c1.id}:out\", f\"{c2.id}:in\", tooltip=f\"{topic.name} ({lat_stats.mean()*1000:.2f}ms)\")\n", + " g.edge(f\"{c1.id}:out\", f\"{c2.id}:in\", tooltip=f\"{topic.name} ({lat_stats.mean() * 1000:.2f}ms)\")\n", "\n", "for c, t in latency_graph.starts.items():\n", " g.edge(\"INPUT:out\", f\"{c.id}:in\", tooltip=t.name)\n", @@ -1444,11 +1186,12 @@ "\n", "@dataclass\n", "class PathElem:\n", - " src: CallbackObject\n", - " dst: CallbackObject\n", - " topic: Topic\n", + " src: TrCallbackObject\n", + " dst: TrCallbackObject\n", + " topic: TrTopic\n", " latencies: LatencyStats\n", "\n", + "\n", "def get_latency_paths(cb1, cb_to_cb_to_lat_stats, goals, parent_path=[]) -> List[List[PathElem]]:\n", " if cb1 in goals:\n", " return [parent_path]\n", @@ -1459,7 +1202,8 @@ " paths = []\n", " for cb2 in cb_to_cb_to_lat_stats[cb1]:\n", " for topic, lats in cb_to_cb_to_lat_stats[cb1][cb2].items():\n", - " new_paths = get_latency_paths(cb2, cb_to_cb_to_lat_stats, goals, parent_path + [PathElem(cb1, cb2, topic, lats)])\n", + " new_paths = get_latency_paths(cb2, cb_to_cb_to_lat_stats, goals,\n", + " parent_path + [PathElem(cb1, cb2, topic, lats)])\n", " paths += new_paths\n", "\n", " return paths\n", @@ -1492,13 +1236,15 @@ "for cb in inputs:\n", " latency_paths += get_latency_paths(cb, cb_to_cb_to_lat_stats, outputs.keys())\n", "\n", + "\n", "def pl(l, lvl=0):\n", " if isinstance(l, list):\n", - " print(\" \"*lvl, type(l), len(l))\n", + " print(\" \" * lvl, type(l), len(l))\n", " for i in l:\n", - " pl(i, lvl+1)\n", + " pl(i, lvl + 1)\n", " else:\n", - " print(\" \"*lvl, type(l))\n", + " print(\" \" * lvl, type(l))\n", + "\n", "\n", "#pl(latency_paths)\n", "\n", @@ -1508,7 +1254,7 @@ " tot_lat = 0.0\n", " for item in path:\n", " tot_lat += item.latencies.mean()\n", - " print(f\"{item.topic.name:120s} {item.latencies.mean()*1000:.3f}ms {tot_lat*1000:.3f}ms\")" + " print(f\"{item.topic.name:120s} {item.latencies.mean() * 1000:.3f}ms {tot_lat * 1000:.3f}ms\")" ] }, { @@ -1528,7 +1274,7 @@ " ax.plot(lat_stats.index, np.where(np.isnan(lat_stats), 0, lat_stats))\n", "\n", "ax.set_ylim(0, .1)\n", - "ax.set_xlim(655+1.652795e9, 660+1.652795e9)\n", + "ax.set_xlim(655 + 1.652795e9, 660 + 1.652795e9)\n", "None" ] }, diff --git a/tracing_interop/__init__.py b/tracing_interop/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tracing_interop/types.py b/tracing_interop/types.py new file mode 100644 index 0000000..88f306d --- /dev/null +++ b/tracing_interop/types.py @@ -0,0 +1,339 @@ +from dataclasses import dataclass +from functools import cached_property +from typing import List, Dict + +import pandas as pd + +from tracetools_analysis.processor.ros2 import Ros2Handler +from tracetools_analysis.utils.ros2 import Ros2DataModelUtil + +from .utils import list_to_dict, df_to_type_list + + +@dataclass +class TrContext: + nodes: Dict[int, 'TrNode'] + publishers: Dict[int, 'TrPublisher'] + subscriptions: Dict[int, 'TrSubscription'] + timers: Dict[int, 'TrTimer'] + timer_node_links: Dict[int, 'TrTimerNodeLink'] + subscription_objects: Dict[int, 'TrSubscriptionObject'] + callback_objects: Dict[int, 'TrCallbackObject'] + callback_symbols: Dict[int, 'TrCallbackSymbol'] + publish_instances: List['TrPublishInstance'] + callback_instances: List['TrCallbackInstance'] + topics: Dict[str, 'TrTopic'] + + util: Ros2DataModelUtil | None + handler: Ros2Handler | None + + def __init__(self, util: Ros2DataModelUtil, handler: Ros2Handler): + self.util = util + self.handler = handler + + print("[TrContext] Processing ROS 2 objects from traces...") + + self.nodes = list_to_dict(df_to_type_list(handler.data.nodes, TrNode, _c=self)) + print(f" ├─ Processed {len(self.nodes):<8d} nodes") + self.publishers = list_to_dict(df_to_type_list(handler.data.rcl_publishers, TrPublisher, _c=self)) + print(f" ├─ Processed {len(self.publishers):<8d} publishers") + self.subscriptions = list_to_dict(df_to_type_list(handler.data.rcl_subscriptions, TrSubscription, _c=self)) + print(f" ├─ Processed {len(self.subscriptions):<8d} subscriptions") + self.timers = list_to_dict(df_to_type_list(handler.data.timers, TrTimer, _c=self)) + print(f" ├─ Processed {len(self.timers):<8d} timers") + self.timer_node_links = list_to_dict(df_to_type_list(handler.data.timer_node_links, TrTimerNodeLink)) + print(f" ├─ Processed {len(self.timer_node_links):<8d} timer-node links") + self.subscription_objects = list_to_dict( + df_to_type_list(handler.data.subscription_objects, TrSubscriptionObject, _c=self)) + print(f" ├─ Processed {len(self.subscription_objects):<8d} subscription objects") + self.callback_objects = list_to_dict(df_to_type_list(handler.data.callback_objects, TrCallbackObject, _c=self)) + print(f" ├─ Processed {len(self.callback_objects):<8d} callback objects") + self.callback_symbols = list_to_dict(df_to_type_list(handler.data.callback_symbols, TrCallbackSymbol, _c=self)) + print(f" ├─ Processed {len(self.callback_symbols):<8d} callback symbols") + self.publish_instances = df_to_type_list(handler.data.rcl_publish_instances, TrPublishInstance, _c=self) + print(f" ├─ Processed {len(self.publish_instances):<8d} publish instances") + self.callback_instances = df_to_type_list(handler.data.callback_instances, TrCallbackInstance, _c=self) + print(f" ├─ Processed {len(self.callback_instances):<8d} callback instances") + + _unique_topic_names = {*(pub.topic_name for pub in self.publishers.values()), + *(sub.topic_name for sub in self.subscriptions.values())} + self.topics = list_to_dict(map(lambda name: TrTopic(name=name, _c=self), _unique_topic_names), key="name") + print(f" └─ Processed {len(self.topics):<8d} topics\n") + + print("[TrContext] Caching dynamic properties...") + + [(o.path, o.publishers, o.subscriptions, o.timers) for o in self.nodes.values()] + print(" ├─ Cached node properties") + [(o.instances, o.subscriptions) for o in self.publishers.values()] + print(" ├─ Cached publisher properties") + [(o.publishers, o.subscription_objects) for o in self.subscriptions.values()] + print(" ├─ Cached subscription properties") + [(o.nodes) for o in self.timers.values()] + print(" ├─ Cached timer properties") + [(o.callback_instances, o.owner, o.owner_info) for o in self.callback_objects.values()] + print(" ├─ Cached callback object properties") + [(o.callback_objs) for o in self.callback_symbols.values()] + print(" ├─ Cached callback symbol properties") + [(o.publishers, o.subscriptions) for o in self.topics.values()] + print(" └─ Cached topic properties\n") + + def __getstate__(self): + state = self.__dict__.copy() + del state["util"] + del state["handler"] + return state + + def __setstate__(self, state): + self.__dict__.update(state) + self.util = None + self.handler = None + + +@dataclass +class TrNode: + id: int + timestamp: int + tid: int + rmw_handle: int + name: str + namespace: str + _c: TrContext + + @cached_property + def path(self) -> str: + return '/'.join((self.namespace, self.name)) + + @cached_property + def publishers(self) -> List['TrPublisher']: + return list(filter(lambda pub: pub.node_handle == self.id, self._c.publishers.values())) + + @cached_property + def subscriptions(self) -> List['TrSubscription']: + return list(filter(lambda sub: sub.node_handle == self.id, self._c.subscriptions.values())) + + @cached_property + def timers(self) -> List['TrTimer']: + links = [link.id for link in self._c.timer_node_links.values() if link.node_handle == self.id] + return list(filter(lambda timer: timer.id in links, self._c.timers.values())) + + def __hash__(self): + return hash(self.id) + + +@dataclass +class TrPublisher: + id: int + timestamp: int + node_handle: int + rmw_handle: int + topic_name: str + depth: int + _c: TrContext + + @property + def node(self) -> 'TrNode': + return self._c.nodes[self.node_handle] + + @cached_property + def subscriptions(self) -> List['TrSubscription']: + return list(filter(lambda sub: sub.topic_name == self.topic_name, self._c.subscriptions.values())) + + @cached_property + def instances(self) -> List['TrPublishInstance']: + return list(filter(lambda inst: inst.publisher_handle == self.id, self._c.publish_instances)) + + @property + def topic(self) -> 'TrTopic': + return self._c.topics[self.topic_name] + + def __hash__(self): + return hash(self.id) + + +@dataclass +class TrSubscription: + id: int + timestamp: int + node_handle: int + rmw_handle: int + topic_name: str + depth: int + _c: TrContext + + @property + def node(self) -> 'TrNode': + return self._c.nodes[self.node_handle] + + @cached_property + def publishers(self) -> List['TrPublisher']: + return list(filter(lambda pub: pub.topic_name == self.topic_name, self._c.publishers.values())) + + @cached_property + def subscription_objects(self) -> List['TrSubscriptionObject']: + return list( + filter(lambda sub_obj: sub_obj.subscription_handle == self.id, self._c.subscription_objects.values())) + + @property + def topic(self) -> 'TrTopic': + return self._c.topics[self.topic_name] + + def __hash__(self): + return hash(self.id) + + +@dataclass +class TrTimer: + id: int + timestamp: int + period: int + tid: int + _c: TrContext + + @cached_property + def nodes(self) -> List['TrNode']: + links = [link.node_handle for link in self._c.timer_node_links.values() if link.id == self.id] + return list(filter(lambda node: node.id in links, self._c.nodes.values())) + + @property + def callback_object(self) -> 'TrCallbackObject': + return self._c.callback_objects[self.id] + + def __hash__(self): + return hash(self.id) + + +@dataclass +class TrTimerNodeLink: + id: int + timestamp: int + node_handle: int + + +@dataclass +class TrSubscriptionObject: + id: int # subscription + timestamp: int + subscription_handle: int + _c: TrContext + + @property + def subscription(self) -> 'TrSubscription': + return self._c.subscriptions[self.subscription_handle] + + @property + def callback_object(self) -> 'TrCallbackObject': + return self._c.callback_objects[self.id] + + def __hash__(self): + return hash((self.id, self.timestamp, self.subscription_handle)) + + +@dataclass +class TrCallbackObject: + id: int # (reference) = subscription_object.id | timer.id | .... + timestamp: int + callback_object: int + _c: TrContext + + @cached_property + def callback_instances(self) -> List['TrCallbackInstance']: + return list(filter(lambda inst: inst.callback_object == self.callback_object, self._c.callback_instances)) + + @property + def callback_symbol(self) -> 'TrCallbackSymbol': + return self._c.callback_symbols[self.id] + + @cached_property + def owner(self): + if self.id in self._c.timers: + return self._c.timers[self.id] + if self.id in self._c.publishers: + return self._c.publishers[self.id] + if self.id in self._c.subscription_objects: + return self._c.subscription_objects[self.id] + if self.id in self._c.handler.data.services.index: + return 'Service' + if self.id in self._c.handler.data.clients.index: + return 'Client' + return None + + @cached_property + def owner_info(self): + info = self._c.util.get_callback_owner_info(self.callback_object) + if info is None: + return None, None + + type_name, dict_str = info.split(" -- ") + kv_strs = dict_str.split(", ") + info_dict = {k: v for k, v in map(lambda kv_str: kv_str.split(": ", maxsplit=1), kv_strs)} + return type_name, info_dict + + def __hash__(self): + return hash((self.id, self.timestamp, self.callback_object)) + + +@dataclass +class TrPublishInstance: + publisher_handle: int + timestamp: int + message: int + _c: TrContext + + @property + def publisher(self) -> 'TrPublisher': + return self._c.publishers[self.publisher_handle] + + def __hash__(self): + return hash((self.publisher_handle, self.timestamp, self.message)) + + +@dataclass +class TrCallbackInstance: + callback_object: int + timestamp: pd.Timestamp + duration: pd.Timedelta + intra_process: bool + _c: TrContext + + @property + def callback_obj(self) -> 'TrCallbackObject': + return self._c.callback_objects[self.callback_object] + + def __hash__(self): + return hash((self.callback_object, self.timestamp, self.duration)) + + +@dataclass +class TrCallbackSymbol: + id: int # callback_object + timestamp: int + symbol: str + _c: TrContext + + @cached_property + def callback_objs(self) -> List['TrCallbackObject']: + return list(filter(lambda cb_obj: cb_obj.callback_object == self.id, self._c.callback_objects.values())) + + def __hash__(self): + return hash((self.id, self.timestamp, self.symbol)) + + +####################################### +# Self-defined (not from ROS2DataModel) +####################################### + +@dataclass +class TrTopic: + name: str + _c: TrContext + + @cached_property + def publishers(self) -> List['TrPublisher']: + return list(filter(lambda pub: pub.topic_name == self.name, self._c.publishers.values())) + + @cached_property + def subscriptions(self) -> List['TrSubscription']: + return list(filter(lambda sub: sub.topic_name == self.name, self._c.subscriptions.values())) + + def __hash__(self): + return hash(self.name) diff --git a/tracing_interop/utils.py b/tracing_interop/utils.py new file mode 100644 index 0000000..05b2329 --- /dev/null +++ b/tracing_interop/utils.py @@ -0,0 +1,24 @@ +import sys + +import pandas as pd + + +def row_to_type(row, type, has_idx, **type_kwargs): + return type(id=row.name, **row, **type_kwargs) if has_idx else type(**row, **type_kwargs) + + +def df_to_type_list(df, type, **type_kwargs): + has_idx = not isinstance(df.index, pd.RangeIndex) + return [row_to_type(row, type, has_idx, **type_kwargs) for _, row in df.iterrows()] + + +def by_index(df, index, type): + return df_to_type_list(df.loc[index], type) + + +def by_column(df, column_name, column_val, type): + return df_to_type_list(df[df[column_name] == column_val], type) + + +def list_to_dict(ls, key='id'): + return {getattr(item, key): item for item in ls} diff --git a/utils.py b/utils.py deleted file mode 100644 index 809267d..0000000 --- a/utils.py +++ /dev/null @@ -1,32 +0,0 @@ -import math - - -def left_abbreviate(string, limit=120): - return string if len(string) <= limit else f"...{string[:limit-3]}" - - -class ProgressPrinter: - def __init__(self, verb, n) -> None: - self.verb = verb - self.n = n - self.i = 0 - self.fmt_len = math.ceil(math.log10(n if n > 0 else 1)) - - def step(self, msg): - self.i += 1 - print(f"({self.i:>{self.fmt_len}d}/{self.n}) {self.verb} {left_abbreviate(msg):<120}", end="\r") - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_value, exc_traceback): - self.i -= 1 - - if exc_value: - self.step("error.") - print() - print(exc_value) - return - - self.step("done.") - print() \ No newline at end of file