From dc3a8084b199f1b254c984f63f52eed399a98c88 Mon Sep 17 00:00:00 2001 From: Maximilian Schmeller Date: Sun, 25 Dec 2022 20:47:56 +0900 Subject: [PATCH] Allow for better TF2 insight and analysis [WIP], write more outputs to the specified out dir and not to the project dir, add E2E_EXACT_PATH setting to speed up execution for known paths --- latency_graph/latency_graph_structure.py | 2 +- matching/subscriptions.py | 2 +- message_tree/message_tree_algorithms.py | 24 ++- trace-analysis.ipynb | 250 ++++++++++++++--------- tracing_interop/tr_types.py | 25 ++- 5 files changed, 185 insertions(+), 118 deletions(-) diff --git a/latency_graph/latency_graph_structure.py b/latency_graph/latency_graph_structure.py index 8587d8a..c97e234 100644 --- a/latency_graph/latency_graph_structure.py +++ b/latency_graph/latency_graph_structure.py @@ -113,7 +113,7 @@ def _get_cb_topic_deps(nodes_to_cbs: Dict[TrNode, Set[TrCallbackObject]]): p.update() if type(cb.owner) == TrSubscriptionObject: - dep_topics = [cb.owner.subscription.topic] + dep_topics = cb.owner.subscription.topics elif type(cb.owner) == TrTimer: dep_topics = [] elif cb.owner is None: diff --git a/matching/subscriptions.py b/matching/subscriptions.py index d3d461e..4bee70c 100644 --- a/matching/subscriptions.py +++ b/matching/subscriptions.py @@ -260,7 +260,7 @@ def match(tr: TrContext, cl: ClContext): match tr_cb.owner: case TrSubscriptionObject(subscription=tr_sub): tr_sub: TrSubscription - tr_topic = tr_sub.topic_name + tr_topic = tr_sub.topic_names # TODO: str->list type refactoring if not tr_topic: continue case _: diff --git a/message_tree/message_tree_algorithms.py b/message_tree/message_tree_algorithms.py index 9e7e75c..e0f0713 100644 --- a/message_tree/message_tree_algorithms.py +++ b/message_tree/message_tree_algorithms.py @@ -2,7 +2,7 @@ import re from bisect import bisect_right from collections import defaultdict from functools import cache -from typing import List +from typing import List, Optional from tqdm import tqdm @@ -30,7 +30,7 @@ def _repr(inst: TrCallbackInstance | TrPublishInstance): def get_dep_tree(inst: TrPublishInstance | TrCallbackInstance, lat_graph: LatencyGraph, tr: TrContext, - excluded_path_patterns, time_limit_s): + excluded_path_patterns, time_limit_s, exact_path: Optional[List[str]] = None): """ Finds all (desired) dependencies of a publish/callback instance and returns a dependency tree. @@ -43,12 +43,19 @@ def get_dep_tree(inst: TrPublishInstance | TrCallbackInstance, lat_graph: Latenc start_time = inst.timestamp - def __get_dep_tree(inst, is_dep_cb, visited=None): + def __get_dep_tree(inst, is_dep_cb, visited=None, level=0): # If inst owner has been visited already, skip (loop prevention) if visited is not None and owner(inst) in visited: return None + # If an exact path is given, the owner of `inst` at `level` has to be identical to the corresponding item + # in `exact_path`. The level is non-positive (start at 0, descend one per recursion). + # `exact_path` is therefore traversed from back to front. + # If `level` is deeper than `exact_path` is long, the tree is also not of interest and thus discarded. + if exact_path is not None and ((level - 1 < -len(exact_path)) or exact_path[level - 1] != owner(inst)): + return None + # If we want to retrieve the tree, look in the cache first cache_key = (inst, is_dep_cb) if cache_key in __dep_tree_cache: @@ -81,7 +88,7 @@ def get_dep_tree(inst: TrPublishInstance | TrCallbackInstance, lat_graph: Latenc # print("Rec level", lvl) deps = list(filter(None, deps)) # Copy visited set for each child because we don't want separate paths to interfere with each other - deps = [__get_dep_tree(dep, children_are_dep_cbs, {*visited, owner(inst)}) for dep in deps] + deps = [__get_dep_tree(dep, children_are_dep_cbs, {*visited, owner(inst)}, level=level-1) for dep in deps] deps = list(filter(None, deps)) # Create tree instance, cache and return it @@ -92,7 +99,7 @@ def get_dep_tree(inst: TrPublishInstance | TrCallbackInstance, lat_graph: Latenc return __get_dep_tree(inst, False) -def build_dep_trees(end_topics, lat_graph, tr, excluded_path_patterns, time_limit_s): +def build_dep_trees(end_topics, lat_graph, tr, excluded_path_patterns, time_limit_s, exact_path=None): """ Builds the dependency trees for all messages published in any of `end_topics` and returns them as a list. """ @@ -106,7 +113,7 @@ def build_dep_trees(end_topics, lat_graph, tr, excluded_path_patterns, time_limi msgs = list(pub.instances) for msg in tqdm(msgs, mininterval=5.0, desc="Processing output messages"): msg: TrPublishInstance - tree = get_dep_tree(msg, lat_graph, tr, excluded_path_patterns, time_limit_s) + tree = get_dep_tree(msg, lat_graph, tr, excluded_path_patterns, time_limit_s, exact_path=exact_path) all_trees.append(tree) return all_trees @@ -121,9 +128,9 @@ def inst_get_dep_msg(inst: TrCallbackInstance, tr: TrContext): return None sub_obj: TrSubscriptionObject = inst.callback_obj.owner - if sub_obj and sub_obj.subscription and sub_obj.subscription.topic: + if sub_obj and sub_obj.subscription and sub_obj.subscription.topics: # print(f"Subscription has no topic") - pubs = sub_obj.subscription.topic.publishers + pubs = [p for t in sub_obj.subscription.topics for p in t.publishers] else: pubs = [] @@ -144,7 +151,6 @@ def inst_get_dep_msg(inst: TrCallbackInstance, tr: TrContext): msg = msgs[0] return msg - # print(f"No messages found for topic {sub_obj.subscription.topic}") return None diff --git a/trace-analysis.ipynb b/trace-analysis.ipynb index 24a6e3f..f0c7c2d 100644 --- a/trace-analysis.ipynb +++ b/trace-analysis.ipynb @@ -8,16 +8,8 @@ "import os\n", "import sys\n", "import re\n", - "import math\n", "from tqdm import tqdm\n", - "from bisect import bisect_right\n", - "from termcolor import colored\n", - "\n", - "import matplotlib.patches as mpatch\n", - "from scipy import stats\n", - "from cycler import cycler\n", - "\n", - "import glob\n", + "from typing import Iterable\n", "\n", "import numpy as np\n", "import pandas as pd\n", @@ -70,7 +62,7 @@ "# Using the path \"/ust\" at the end is optional but greatly reduces processing time\n", "# if kernel traces are also present.\n", "# TR_PATH = \"~/Downloads/iteration1_worker1/aw_replay/tracing/scenario-trace/ust\"\n", - "TR_PATH = \"data/trace-awsim-x86/ust\"\n", + "TR_PATH = \"/home/max/Projects/ma-measurements/artifacts_20221223_155956/tracing/max-ma-trace/ust\"\n", "\n", "# Path to the folder all artifacts from this notebook are saved to.\n", "# This entails plots as well as data tables.\n", @@ -80,7 +72,7 @@ "CACHING_ENABLED = False\n", "\n", "# Whether to annotate topics/publications with bandwidth/message size\n", - "BW_ENABLED = True\n", + "BW_ENABLED = False\n", "# Path to a HDF5 file as output by ma-hw-perf-tools/messages/record.bash\n", "# Used to annotate message sizes in E2E latency calculations\n", "BW_PATH = \"../ma-hw-perf-tools/data/messages-x86.h5\"\n", @@ -95,9 +87,9 @@ "\n", "# Whether to compute data flow graphs.\n", "# If you are only interested in E2E latencies, set this to False\n", - "DFG_ENABLED = True\n", + "DFG_ENABLED = False\n", "# Whether to plot data flow graphs (ignored if DFG_ENABLED is False)\n", - "DFG_PLOT = True\n", + "DFG_PLOT = False\n", "\n", "# The maximum node namespace hierarchy level to be plotted.\n", "# Top-level (1): e.g. /sensing, /control, etc.\n", @@ -109,9 +101,9 @@ "DFG_INPUT_NODE_PATTERNS = [r\"^/sensing\"]\n", "# RegEx pattern for nodes that shall be marked as system outputs\n", "# These will be plotted with a double border\n", - "DFG_OUTPUT_NODE_PATTERNS = [r\"^/awapi\", r\"^/control/external_cmd_converter\", \"emergency\"]\n", + "DFG_OUTPUT_NODE_PATTERNS = [r\"^/control/external_cmd_converter\"]\n", "# RegEx for nodes which shall not be plotted in the DFG\n", - "DFG_EXCL_NODE_PATTERNS = [r\"^/rviz2\", r\"transform_listener_impl\"]\n", + "DFG_EXCL_NODE_PATTERNS = [r\"^/rviz2\"]\n", "\n", "# Whether to compute E2E latencies.\n", "E2E_ENABLED = True\n", @@ -131,15 +123,48 @@ "E2E_OUTPUT_TOPIC_PATTERNS = [r\"^/control/command/control_cmd$\"]\n", "# All topics containing any of these RegEx patterns are considered input topics in E2E latency calculations\n", "# E.g. r\"^/sensing/\" will cover all sensing topics\n", - "E2E_INPUT_TOPIC_PATTERNS = [r\"^/sensing/.*?pointcloud\"]\n", + "E2E_INPUT_TOPIC_PATTERNS = [r\"pointcloud\"]\n", "\n", "# E2E paths are uniquely identified by a string like \"/topic/1 -> void(Node1)(args1) -> /topic/2 -> void(Node2)(args2) -> void(Node2)(args3) -> ...\".\n", "# Certain patterns only occur in initial setup or in scenario switching and can be excluded via RegEx patterns here.\n", - "E2E_EXCL_PATH_PATTERNS = [r\"NDTScanMatcher\", r\"^/parameter_events\", \"hazard\", \"turn_indicator\", \"gear_cmd\", \"emergency_cmd\",\n", - " \"external_cmd\", \"/control/operation_mode\", \"/planning/scenario_planning/scenario$\"]\n", + "E2E_EXCL_PATH_PATTERNS = [r\"^/parameter_events\", \"hazard\", \"turn_indicator\", \"gear_cmd\", \"emergency_cmd\", \"emergency_state\",\n", + " \"external_cmd\", \"/control/operation_mode\"]\n", "\n", "# To specify paths of interest, topic/callback name patterns that HAVE TO OCCUR in each E2E path can be specified as RegEx here.\n", - "E2E_INCL_PATH_PATTERNS = [\"BehaviorPathPlanner\", \"BehaviorVelocityPlanner\", \"pointcloud_preprocessor::Filter\", r\"^/sensing/.*?pointcloud\"]\n", + "#E2E_INCL_PATH_PATTERNS = [\"BehaviorPathPlanner\", \"BehaviorVelocityPlanner\", \"pointcloud_preprocessor::Filter\", r\"^/sensing/.*?pointcloud\"]\n", + "E2E_INCL_PATH_PATTERNS = []\n", + "\n", + "# If an exact path through the system is known, this variabe can be set to a list (order matters!) of all elements on the path.\n", + "# The first item ist the one at the \"input end\" of the system, the last one the \"output end\".\n", + "E2E_EXACT_PATH = [\n", + " \"void(pointcloud_preprocessor::Filter)(sensor_msgs::msg::PointCloud2,pcl_msgs::msg::PointIndices)\",\n", + " \"/localization/util/downsample/pointcloud\",\n", + " \"void(NDTScanMatcher)(sensor_msgs::msg::PointCloud2)\",\n", + " \"/localization/pose_estimator/pose_with_covariance\",\n", + " \"void(EKFLocalizer)(geometry_msgs::msg::PoseWithCovarianceStamped)\",\n", + " \"void(EKFLocalizer)()\",\n", + " \"/localization/pose_twist_fusion_filter/kinematic_state\",\n", + " \"void(StopFilter)(nav_msgs::msg::Odometry)\",\n", + " \"/localization/kinematic_state\",\n", + " \"void(behavior_path_planner::BehaviorPathPlannerNode)(nav_msgs::msg::Odometry)\",\n", + " \"void(behavior_path_planner::BehaviorPathPlannerNode)()\",\n", + " \"/planning/scenario_planning/lane_driving/behavior_planning/path_with_lane_id\",\n", + " \"void(behavior_velocity_planner::BehaviorVelocityPlannerNode)(autoware_auto_planning_msgs::msg::PathWithLaneId)\",\n", + " \"/planning/scenario_planning/lane_driving/behavior_planning/path\",\n", + " \"void(ObstacleAvoidancePlanner)(autoware_auto_planning_msgs::msg::Path)\",\n", + " \"/planning/scenario_planning/lane_driving/motion_planning/obstacle_avoidance_planner/trajectory\",\n", + " \"void(motion_planning::ObstacleStopPlannerNode)(autoware_auto_planning_msgs::msg::Trajectory)\",\n", + " \"/planning/scenario_planning/lane_driving/trajectory\",\n", + " \"void(ScenarioSelectorNode)(autoware_auto_planning_msgs::msg::Trajectory)\",\n", + " \"/planning/scenario_planning/scenario_selector/trajectory\",\n", + " \"void(motion_velocity_smoother::MotionVelocitySmootherNode)(autoware_auto_planning_msgs::msg::Trajectory)\",\n", + " \"/planning/scenario_planning/trajectory\",\n", + " \"void(autoware::motion::control::trajectory_follower_nodes::Controller)(autoware_auto_planning_msgs::msg::Trajectory)\",\n", + " \"void(autoware::motion::control::trajectory_follower_nodes::Controller)()\",\n", + " \"/control/trajectory_follower/control_cmd\",\n", + " \"void(vehicle_cmd_gate::VehicleCmdGate)(autoware_auto_control_msgs::msg::AckermannControlCommand)\",\n", + " \"/control/command/control_cmd\",\n", + "]\n", "\n", "# For development purposes only. Leave this at False.\n", "DEBUG = False\n", @@ -181,6 +206,8 @@ "for k, v in globals().copy().items():\n", " if not k.isupper():\n", " continue\n", + " if isinstance(v, Iterable) and not isinstance(v, str):\n", + " v = (\"\\n \" + (\" \" * 44)).join(list(map(str, v)))\n", " print(f\" {k:.<40s} := {v}\")" ], "metadata": { @@ -235,7 +262,17 @@ " \"callback_objects\", \"callback_symbols\", \"publish_instances\", \"callback_instances\", \"topics\"]\n", "\n", "# Help the IDE recognize those identifiers by assigning a dummy value to their name.\n", - "nodes = publishers = subscriptions = timers = timer_node_links = subscription_objects = callback_objects = callback_symbols = publish_instances = callback_instances = topics = None\n", + "nodes: Index = Index([])\n", + "publishers: Index = Index([])\n", + "subscriptions: Index = Index([])\n", + "timers: Index = Index([])\n", + "timer_node_links: Index = Index([])\n", + "subscription_objects: Index = Index([])\n", + "callback_objects: Index = Index([])\n", + "callback_symbols: Index = Index([])\n", + "publish_instances: Index = Index([])\n", + "callback_instances: Index = Index([])\n", + "topics: Index = Index([])\n", "\n", "for name in _tr_globals:\n", " globals()[name] = getattr(_tracing_context, name)\n", @@ -246,6 +283,67 @@ "collapsed": false } }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "##################################################\n", + "# Split /tf into Artificial Topics\n", + "##################################################\n", + "# /tf is one ROS2 topic but carries multiple\n", + "# different streams of information:\n", + "# One for each publisher of the /tf topic.\n", + "# We can separate the messages on /tf according\n", + "# to those publishers.\n", + "# Still, all subscribers to /tf are subscribed\n", + "# to ALL of those split topics, so the\n", + "# found dependencies are still a vast over-\n", + "# approximation of the actual dataflow.\n", + "##################################################\n", + "\n", + "tf_topic: TrTopic = topics.by_name[\"/tf\"]\n", + "\n", + "tf_split_topics = []\n", + "for pub in tf_topic.publishers:\n", + " pub: TrPublisher\n", + " topic = TrTopic(f\"/tf[{pub.node.path}_{pub.id}]\", _tracing_context)\n", + " pub.topic_name = topic.name\n", + " for sub in tf_topic.subscriptions:\n", + " sub.topic_names.append(topic.name)\n", + "\n", + " tf_split_topics.append(topic)\n", + "\n", + "print(\"Rebuilding subscription index\")\n", + "subscriptions.rebuild()\n", + "print(\"Rebuilding publisher index\")\n", + "publishers.rebuild()\n", + "print(\"Adding new topics and rebuilding topic index\")\n", + "topics.append(tf_split_topics)" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "print(\"Artificial TF topics (/tf[...]):\")\n", + "for t in tf_split_topics:\n", + " print(f\"{t.name:.<110s} | {len(t.publishers)} pubs, {sum(map(lambda p: len(p.instances), t.publishers))}\")\n", + " for sub in t.subscriptions:\n", + " print(f\" {sub.node.path:.<106s} | {len(sub.subscription_object.callback_object.callback_instances)} {len(sub.node.publishers)} {sum(map(lambda p: len(p.instances), sub.node.publishers))}\")\n", + " #prefix = os.path.split(sub.node.path)[0]\n", + " #for node in nodes:\n", + " # if node.path.startswith(prefix) and node.path.removeprefix(prefix).count(\"/\") <= 1 and \"transform_listener_impl\" not in node.path:\n", + " # print(f\" -> {' ' * (len(prefix) - 3)}{node.path.removeprefix(prefix)}\")" + ], + "metadata": { + "collapsed": false + } + }, { "cell_type": "code", "execution_count": null, @@ -257,19 +355,19 @@ "\n", "for topic in sorted(topics, key=lambda t: t.name):\n", " topic: TrTopic\n", - " print(f\"{topic.name:.<120s} | {sum(map(lambda p: len(p.instances), topic.publishers))}\")\n", + " print(f\"{topic.name:.<130s} | {sum(map(lambda p: len(p.instances), topic.publishers)):>5d} msgs\")\n", "\n", "print(\"\\n[DEBUG] INPUT TOPICS\")\n", "for t in sorted(topics, key=lambda t: t.name):\n", " for f in E2E_INPUT_TOPIC_PATTERNS:\n", " if re.search(f, t.name):\n", - " print(f\"--[DEBUG] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers))}\")\n", + " print(f\"--[DEBUG] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers)):>5d} msgs\")\n", "\n", "print(\"\\n[DEBUG] OUTPUT TOPICS\")\n", "for t in sorted(topics, key=lambda t: t.name):\n", " for f in E2E_OUTPUT_TOPIC_PATTERNS:\n", " if re.search(f, t.name):\n", - " print(f\"--[ENTKÄFERN] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers))}\")" + " print(f\"--[DEBUG] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers)):>5d} msgs\")" ], "metadata": { "collapsed": false @@ -327,7 +425,7 @@ "\n", "from latency_graph.latency_graph_plots import plot_latency_graph_full\n", "\n", - "plot_latency_graph_full(lat_graph, _tracing_context, \"latency_graph_full\")" + "plot_latency_graph_full(lat_graph, _tracing_context, os.path.join(OUT_PATH, \"latency_graph_full\"))" ], "metadata": { "collapsed": false @@ -358,7 +456,7 @@ "\n", "from latency_graph.latency_graph_plots import plot_latency_graph_overview\n", "\n", - "plot_latency_graph_overview(lat_graph, DFG_EXCL_NODE_PATTERNS, DFG_INPUT_NODE_PATTERNS, DFG_OUTPUT_NODE_PATTERNS, DFG_MAX_HIER_LEVEL, \"latency_graph_overview\")" + "plot_latency_graph_overview(lat_graph, DFG_EXCL_NODE_PATTERNS, DFG_INPUT_NODE_PATTERNS, DFG_OUTPUT_NODE_PATTERNS, DFG_MAX_HIER_LEVEL, os.path.join(OUT_PATH, \"latency_graph_overview\"))" ], "metadata": { "collapsed": false, @@ -389,7 +487,7 @@ "end_topics = [t for t in _tracing_context.topics if any(re.search(f, t.name) for f in E2E_OUTPUT_TOPIC_PATTERNS)]\n", "\n", "def _build_dep_trees():\n", - " return build_dep_trees(end_topics, lat_graph, _tracing_context, E2E_EXCL_PATH_PATTERNS, E2E_TIME_LIMIT_S)\n", + " return build_dep_trees(end_topics, lat_graph, _tracing_context, E2E_EXCL_PATH_PATTERNS, E2E_TIME_LIMIT_S, exact_path=E2E_EXACT_PATH)\n", "\n", "try:\n", " trees = cached(\"trees\", _build_dep_trees, [TR_PATH], not CACHING_ENABLED)\n", @@ -425,21 +523,6 @@ "collapsed": false } }, - { - "cell_type": "code", - "execution_count": null, - "outputs": [], - "source": [ - "%%skip_if_false DEBUG\n", - "\n", - "import pickle\n", - "with open(\"trees.pkl\", \"rb\") as f:\n", - " trees = pickle.load(f)" - ], - "metadata": { - "collapsed": false - } - }, { "cell_type": "code", "execution_count": null, @@ -456,9 +539,7 @@ "\n", "trees_paths = [e2e_paths_sorted_desc(tree, E2E_INPUT_TOPIC_PATTERNS) for tree in tqdm(trees, mininterval=10.0,\n", " desc=\"Extracting E2E paths\")]\n", - "all_paths = [p for paths in trees_paths for p in paths]\n", - "relevant_paths = [p for p in all_paths\n", - " if any(map(lambda inst: re.search(\"^/sensing/.*?pointcloud\", owner(inst)), p))]\n" + "all_paths = [p for paths in trees_paths for p in paths]" ], "metadata": { "collapsed": false @@ -477,7 +558,7 @@ "# Group dataflows by DFG path\n", "##################################################\n", "\n", - "cohorts = aggregate_e2e_paths(relevant_paths) #all_paths)\n", + "cohorts = aggregate_e2e_paths(all_paths) #all_paths)\n", "cohort_pairs = [(k, v) for k, v in cohorts.items()]\n", "cohort_pairs.sort(key=lambda kv: len(kv[1]), reverse=True)\n", "\n", @@ -490,7 +571,7 @@ "out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\", index=False)\n", "\n", "df_print = out_df[['path', 'e2e_latency']].groupby(\"path\").agg(['count', 'mean', 'min', 'max']).reset_index()\n", - "df_print['path'] = df_print['path'].apply(lambda path: \" -> \".join(filter(lambda part: part.startswith(\"/\"), path.split(\" -> \"))))\n", + "#df_print['path'] = df_print['path'].apply(lambda path: \" -> \".join(filter(lambda part: part.startswith(\"/\"), path.split(\" -> \"))))\n", "df_print = df_print.sort_values((\"e2e_latency\", \"count\"), ascending=False)\n", "df_print.to_csv(os.path.join(OUT_PATH, \"e2e_overview.csv\"), sep=\"\\t\", index=False)\n", "df_print" @@ -504,13 +585,11 @@ "execution_count": null, "outputs": [], "source": [ - "%%skip_if_false MANUAL_CACHE\n", - "\n", "import pickle\n", "# with open(\"state.pkl\", \"wb\") as f:\n", - "# pickle.dump((trees_paths, all_paths, lidar_paths, cohorts), f)\n", - "with open(\"state.pkl\", \"rb\") as f:\n", - " (trees_paths, all_paths, relevant_paths, cohorts) = pickle.load(f)" + "# pickle.dump((trees_paths, all_paths, cohorts), f)\n", + "with open(os.path.join(OUT_PATH, \"state.pkl\"), \"wb\") as f:\n", + " pickle.dump((trees_paths, all_paths, cohorts), f)" ], "metadata": { "collapsed": false @@ -526,8 +605,13 @@ "# cannot-be-included patterns\n", "##################################################\n", "\n", + "#for k in cohorts.keys():\n", + "# print(\" \" + \"\\n-> \".join(k.split(\" -> \")))\n", + "# print()\n", + "# print()\n", + "\n", "cohorts_filt = {k: v for k, v in cohorts.items()\n", - " if not any(re.search(f, k) for f in E2E_EXCL_PATH_PATTERNS) and all(re.search(f, k) for f in E2E_INCL_PATH_PATTERNS)}\n", + " if all(re.search(f.removeprefix(\"^\").removesuffix(\"$\"), k) for f in E2E_INCL_PATH_PATTERNS)}\n", "\n", "\n", "print(len(cohorts), len(cohorts_filt))\n", @@ -557,15 +641,15 @@ "##################################################\n", "\n", "e2e_breakdowns = list(map(e2e_latency_breakdown, relevant_dataflows))\n", - "filt = [(path, bdown) for (path, bdown) in zip(relevant_dataflows, e2e_breakdowns)\n", - " if not any(True for item in bdown\n", - " if item.type == \"idle\" and item.duration > item.location[1].callback_obj.owner.period * 1e-9)]\n", + "#filt = [(path, bdown) for (path, bdown) in zip(relevant_dataflows, e2e_breakdowns)\n", + "# if not any(True for item in bdown\n", + "# if item.type == \"idle\" and item.duration > item.location[1].callback_obj.owner.period * 1e-9)]\n", "\n", "# Backup for debug purposes.\n", - "lidar_cohort_orig = relevant_dataflows\n", - "e2e_breakdowns_orig = e2e_breakdowns\n", + "#relevant_dataflows_orig = relevant_dataflows\n", + "#e2e_breakdowns_orig = e2e_breakdowns\n", "\n", - "relevant_dataflows, e2e_breakdowns = zip(*filt)" + "#relevant_dataflows, e2e_breakdowns = zip(*filt)" ], "metadata": { "collapsed": false @@ -586,7 +670,7 @@ "from message_tree.message_tree_algorithms import e2e_latency_breakdown\n", "\n", "conv_items = [i for p in e2e_breakdowns for i in p]\n", - "with open(\"out/plot_e2es_path.txt\", \"w\") as f:\n", + "with open(os.path.join(OUT_PATH, \"plot_e2es_path.txt\"), \"w\") as f:\n", " f.write(f\"Number of path instances: {len(relevant_dataflows)}\\n\")\n", " f.write( \" \" + \"\\n -> \".join(relevant_path.split(\" -> \")))\n", " f.write(\"\\n\")\n", @@ -609,7 +693,7 @@ " durations = [item.duration for item in items if item.type == type]\n", "\n", " df = pd.Series(durations)\n", - " df.to_csv(f\"out/plot_e2es_{type}_portion.csv\", header=[f\"e2e_latency_{type}_portion_s\"], index=False)\n", + " df.to_csv(os.path.join(OUT_PATH, f\"plot_e2es_{type}_portion.csv\"), header=[f\"e2e_latency_{type}_portion_s\"], index=False)\n", "\n", " ax.set_title(type)\n", " ax.hist(durations, bins=50)\n", @@ -626,7 +710,7 @@ "##################################################\n", "\n", "fig = e2e_breakdown_type_hist__(conv_items_unique)\n", - "plt.savefig(\"out/plot_e2e_portions.png\")\n", + "plt.savefig(os.path.join(OUT_PATH, \"plot_e2e_portions.png\"))\n", "\n", "None\n" ], @@ -649,7 +733,7 @@ "e2es = [path[-1].timestamp - path[0].timestamp for path in relevant_dataflows]\n", "\n", "df = pd.Series(e2es)\n", - "df.to_csv(\"out/plot_e2es.csv\", index=False, header=[\"e2e_latency_s\"])\n", + "df.to_csv(os.path.join(OUT_PATH, \"plot_e2es.csv\"), index=False, header=[\"e2e_latency_s\"])\n", "\n", "plt.close(\"E2E histogram\")\n", "fig, ax = plt.subplots(num=\"E2E histogram\", dpi=300, figsize=(16, 9))\n", @@ -661,7 +745,7 @@ "ax.axvline(np.mean(e2es), c=\"red\", linewidth=2)\n", "_, max_ylim = ax.get_ylim()\n", "ax.text(np.mean(e2es) * 1.02, max_ylim * 0.98, 'Mean: {:.3f}s'.format(np.mean(e2es)))\n", - "plt.savefig(\"out/plot_e2es.png\")\n", + "plt.savefig(os.path.join(OUT_PATH, \"plot_e2es.png\"))\n", "None" ], "metadata": { @@ -709,17 +793,17 @@ " add_label(vln, type)\n", " for i, x in zip(indices, xs):\n", " df_out = pd.Series(x)\n", - " df_out.to_csv(f\"out/plot_e2es_violin_{i:02d}.csv\", index=False, header=[\"duration_s\"])\n", + " df_out.to_csv(os.path.join(OUT_PATH, f\"plot_e2es_violin_{i:02d}.csv\"), index=False, header=[\"duration_s\"])\n", "ax.set_ylabel(\"Latency contribution [s]\")\n", "ax.set_xticks(range(len(labels)), labels, rotation=90)\n", "ax.legend(*zip(*legend_entries))\n", - "plt.savefig(\"out/plot_e2es_violin.png\")\n", + "plt.savefig(os.path.join(OUT_PATH, \"plot_e2es_violin.png\"))\n", "\n", "df_labels = pd.Series(labels)\n", - "df_labels.to_csv(\"out/plot_e2es_violin_labels.csv\", index=False, header=[\"label\"])\n", + "df_labels.to_csv(os.path.join(OUT_PATH, \"plot_e2es_violin_labels.csv\"), index=False, header=[\"label\"])\n", "\n", "df_types = pd.Series(types)\n", - "df_types.to_csv(\"out/plot_e2es_violin_types.csv\", index=False, header=[\"type\"])\n", + "df_types.to_csv(os.path.join(OUT_PATH, \"plot_e2es_violin_types.csv\"), index=False, header=[\"type\"])\n", "\n", "None" ], @@ -737,7 +821,7 @@ " continue\n", " dur_ts_records = [(item.location[0].timestamp, item.duration) for item in concat_pc_items]\n", " df_dur_ts = pd.DataFrame(dur_ts_records, columns=(\"timestamp\", \"duration\"))\n", - " df_dur_ts.to_csv(f\"dur_ts_{concat_pc_items[0].location[0].publisher.topic_name.replace('/', '__')}.csv\", index=False)" + " df_dur_ts.to_csv(os.path.join(OUT_PATH, f\"dur_ts_{concat_pc_items[0].location[0].publisher.topic_name.replace('/', '__')}.csv\"), index=False)" ], "metadata": { "collapsed": false @@ -754,7 +838,7 @@ "\n", "records = [(lbl, x.mean(), x.std(), x.min(), x.max()) for x, lbl in zip(xs, lbls)]\n", "df_cpu = pd.DataFrame(records, columns=[\"callback\", \"mean\", \"std\", \"min\", \"max\"])\n", - "df_cpu.to_csv(\"out/calc_times.csv\", index=False)" + "df_cpu.to_csv(os.path.join(OUT_PATH, \"calc_times.csv\"), index=False)" ], "metadata": { "collapsed": false @@ -784,33 +868,7 @@ "execution_count": null, "outputs": [], "source": [ - "%%skip_if_false E2E_ENABLED\n", - "%%skip_if_false DEBUG\n", - "\n", - "import pickle\n", - "\n", - "with open(\"trees.pkl\", \"wb\") as f:\n", - " pickle.dump(trees, f)" - ], - "metadata": { - "collapsed": false - } - }, - { - "cell_type": "code", - "execution_count": null, - "outputs": [], - "source": [ - "ctx = relevant_paths[0][0]._c\n", - "topics = ctx.topics\n", - "\n", - "topics_of_interest = [item.publisher.topic_name for item in relevant_dataflows[0] if isinstance(item, TrPublishInstance)]\n", - "\n", - "topic_records = []\n", - "for t in topics_of_interest:\n", - " topic_records.append((t, topics.by_name[t].subscriptions[0]))\n", - "\n", - "print(topic_records)" + "print(\"Done.\")" ], "metadata": { "collapsed": false diff --git a/tracing_interop/tr_types.py b/tracing_interop/tr_types.py index e94a91f..c799c68 100644 --- a/tracing_interop/tr_types.py +++ b/tracing_interop/tr_types.py @@ -105,8 +105,11 @@ class TrContext: id=False) self.publishers = Index(df_to_type_list(handler.data.rcl_publishers, TrPublisher, _c=self), id=False, node_handle=True, topic_name=True) - self.subscriptions = Index(df_to_type_list(handler.data.rcl_subscriptions, TrSubscription, _c=self), - id=False, node_handle=True, topic_name=True) + self.subscriptions = Index(df_to_type_list(handler.data.rcl_subscriptions, TrSubscription, + column_value_mappers={"topic_name": lambda n: [n]}, + column_to_field_mappings={"topic_name": "topic_names"}, + _c=self), + id=False, node_handle=True, topic_names='n-to-m') self.timers = Index(df_to_type_list(handler.data.timers, TrTimer, _c=self), id=False) self.timer_node_links = Index(df_to_type_list(handler.data.timer_node_links, TrTimerNodeLink), @@ -119,15 +122,15 @@ class TrContext: self.callback_symbols = Index(df_to_type_list(handler.data.callback_symbols, TrCallbackSymbol, _c=self), id=False) self.publish_instances = Index(df_to_type_list(handler.data.rcl_publish_instances, TrPublishInstance, _c=self, - mappers={"timestamp": lambda t: t * 1e-9}), + column_value_mappers={"timestamp": lambda t: t * 1e-9}), publisher_handle=True) self.callback_instances = Index(df_to_type_list(handler.data.callback_instances, TrCallbackInstance, _c=self, - mappers={"timestamp": lambda t: t.timestamp(), + column_value_mappers={"timestamp": lambda t: t.timestamp(), "duration": lambda d: d.total_seconds()}), callback_object=True) _unique_topic_names = {*(pub.topic_name for pub in self.publishers), - *(sub.topic_name for sub in self.subscriptions)} + *(n for sub in self.subscriptions for n in sub.topic_names)} self.topics = Index((TrTopic(name=name, _c=self) for name in _unique_topic_names), name=False) @@ -195,7 +198,7 @@ class TrPublisher: @property def subscriptions(self) -> List['TrSubscription']: - return self._c.subscriptions.by_topic_name.get(self.topic_name) or [] + return self._c.subscriptions.by_topic_names.get(self.topic_name) or [] @property def instances(self) -> List['TrPublishInstance']: @@ -218,7 +221,7 @@ class TrSubscription: timestamp: int node_handle: int rmw_handle: int - topic_name: str + topic_names: List[str] depth: int _c: TrContext = field(repr=False) @@ -228,15 +231,15 @@ class TrSubscription: @property def publishers(self) -> List['TrPublisher']: - return self._c.publishers.by_topic_name.get(self.topic_name) or [] + return list(set(p for n in self.topic_names for p in self._c.publishers.by_topic_name.get(n))) @property def subscription_object(self) -> Optional['TrSubscriptionObject']: return self._c.subscription_objects.by_subscription_handle.get(self.id) @property - def topic(self) -> Optional['TrTopic']: - return self._c.topics.by_name.get(self.topic_name) + def topics(self) -> List['TrTopic']: + return list(set(self._c.topics.by_name.get(n) for n in self.topic_names)) def __hash__(self): return hash(self.id) @@ -417,7 +420,7 @@ class TrTopic: @property def subscriptions(self) -> List['TrSubscription']: - return self._c.subscriptions.by_topic_name.get(self.name) or [] + return self._c.subscriptions.by_topic_names.get(self.name) or [] def __hash__(self): return hash(self.name)