Allow for better TF2 insight and analysis [WIP], write more outputs to the specified out dir and not to the project dir, add E2E_EXACT_PATH setting to speed up execution for known paths

This commit is contained in:
Maximilian Schmeller 2022-12-25 20:47:56 +09:00
parent e2cdfade31
commit dc3a8084b1
5 changed files with 185 additions and 118 deletions

View file

@ -113,7 +113,7 @@ def _get_cb_topic_deps(nodes_to_cbs: Dict[TrNode, Set[TrCallbackObject]]):
p.update()
if type(cb.owner) == TrSubscriptionObject:
dep_topics = [cb.owner.subscription.topic]
dep_topics = cb.owner.subscription.topics
elif type(cb.owner) == TrTimer:
dep_topics = []
elif cb.owner is None:

View file

@ -260,7 +260,7 @@ def match(tr: TrContext, cl: ClContext):
match tr_cb.owner:
case TrSubscriptionObject(subscription=tr_sub):
tr_sub: TrSubscription
tr_topic = tr_sub.topic_name
tr_topic = tr_sub.topic_names # TODO: str->list type refactoring
if not tr_topic:
continue
case _:

View file

@ -2,7 +2,7 @@ import re
from bisect import bisect_right
from collections import defaultdict
from functools import cache
from typing import List
from typing import List, Optional
from tqdm import tqdm
@ -30,7 +30,7 @@ def _repr(inst: TrCallbackInstance | TrPublishInstance):
def get_dep_tree(inst: TrPublishInstance | TrCallbackInstance, lat_graph: LatencyGraph, tr: TrContext,
excluded_path_patterns, time_limit_s):
excluded_path_patterns, time_limit_s, exact_path: Optional[List[str]] = None):
"""
Finds all (desired) dependencies of a publish/callback instance and returns a dependency tree.
@ -43,12 +43,19 @@ def get_dep_tree(inst: TrPublishInstance | TrCallbackInstance, lat_graph: Latenc
start_time = inst.timestamp
def __get_dep_tree(inst, is_dep_cb, visited=None):
def __get_dep_tree(inst, is_dep_cb, visited=None, level=0):
# If inst owner has been visited already, skip (loop prevention)
if visited is not None and owner(inst) in visited:
return None
# If an exact path is given, the owner of `inst` at `level` has to be identical to the corresponding item
# in `exact_path`. The level is non-positive (start at 0, descend one per recursion).
# `exact_path` is therefore traversed from back to front.
# If `level` is deeper than `exact_path` is long, the tree is also not of interest and thus discarded.
if exact_path is not None and ((level - 1 < -len(exact_path)) or exact_path[level - 1] != owner(inst)):
return None
# If we want to retrieve the tree, look in the cache first
cache_key = (inst, is_dep_cb)
if cache_key in __dep_tree_cache:
@ -81,7 +88,7 @@ def get_dep_tree(inst: TrPublishInstance | TrCallbackInstance, lat_graph: Latenc
# print("Rec level", lvl)
deps = list(filter(None, deps))
# Copy visited set for each child because we don't want separate paths to interfere with each other
deps = [__get_dep_tree(dep, children_are_dep_cbs, {*visited, owner(inst)}) for dep in deps]
deps = [__get_dep_tree(dep, children_are_dep_cbs, {*visited, owner(inst)}, level=level-1) for dep in deps]
deps = list(filter(None, deps))
# Create tree instance, cache and return it
@ -92,7 +99,7 @@ def get_dep_tree(inst: TrPublishInstance | TrCallbackInstance, lat_graph: Latenc
return __get_dep_tree(inst, False)
def build_dep_trees(end_topics, lat_graph, tr, excluded_path_patterns, time_limit_s):
def build_dep_trees(end_topics, lat_graph, tr, excluded_path_patterns, time_limit_s, exact_path=None):
"""
Builds the dependency trees for all messages published in any of `end_topics` and returns them as a list.
"""
@ -106,7 +113,7 @@ def build_dep_trees(end_topics, lat_graph, tr, excluded_path_patterns, time_limi
msgs = list(pub.instances)
for msg in tqdm(msgs, mininterval=5.0, desc="Processing output messages"):
msg: TrPublishInstance
tree = get_dep_tree(msg, lat_graph, tr, excluded_path_patterns, time_limit_s)
tree = get_dep_tree(msg, lat_graph, tr, excluded_path_patterns, time_limit_s, exact_path=exact_path)
all_trees.append(tree)
return all_trees
@ -121,9 +128,9 @@ def inst_get_dep_msg(inst: TrCallbackInstance, tr: TrContext):
return None
sub_obj: TrSubscriptionObject = inst.callback_obj.owner
if sub_obj and sub_obj.subscription and sub_obj.subscription.topic:
if sub_obj and sub_obj.subscription and sub_obj.subscription.topics:
# print(f"Subscription has no topic")
pubs = sub_obj.subscription.topic.publishers
pubs = [p for t in sub_obj.subscription.topics for p in t.publishers]
else:
pubs = []
@ -144,7 +151,6 @@ def inst_get_dep_msg(inst: TrCallbackInstance, tr: TrContext):
msg = msgs[0]
return msg
# print(f"No messages found for topic {sub_obj.subscription.topic}")
return None

View file

@ -8,16 +8,8 @@
"import os\n",
"import sys\n",
"import re\n",
"import math\n",
"from tqdm import tqdm\n",
"from bisect import bisect_right\n",
"from termcolor import colored\n",
"\n",
"import matplotlib.patches as mpatch\n",
"from scipy import stats\n",
"from cycler import cycler\n",
"\n",
"import glob\n",
"from typing import Iterable\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
@ -70,7 +62,7 @@
"# Using the path \"/ust\" at the end is optional but greatly reduces processing time\n",
"# if kernel traces are also present.\n",
"# TR_PATH = \"~/Downloads/iteration1_worker1/aw_replay/tracing/scenario-trace/ust\"\n",
"TR_PATH = \"data/trace-awsim-x86/ust\"\n",
"TR_PATH = \"/home/max/Projects/ma-measurements/artifacts_20221223_155956/tracing/max-ma-trace/ust\"\n",
"\n",
"# Path to the folder all artifacts from this notebook are saved to.\n",
"# This entails plots as well as data tables.\n",
@ -80,7 +72,7 @@
"CACHING_ENABLED = False\n",
"\n",
"# Whether to annotate topics/publications with bandwidth/message size\n",
"BW_ENABLED = True\n",
"BW_ENABLED = False\n",
"# Path to a HDF5 file as output by ma-hw-perf-tools/messages/record.bash\n",
"# Used to annotate message sizes in E2E latency calculations\n",
"BW_PATH = \"../ma-hw-perf-tools/data/messages-x86.h5\"\n",
@ -95,9 +87,9 @@
"\n",
"# Whether to compute data flow graphs.\n",
"# If you are only interested in E2E latencies, set this to False\n",
"DFG_ENABLED = True\n",
"DFG_ENABLED = False\n",
"# Whether to plot data flow graphs (ignored if DFG_ENABLED is False)\n",
"DFG_PLOT = True\n",
"DFG_PLOT = False\n",
"\n",
"# The maximum node namespace hierarchy level to be plotted.\n",
"# Top-level (1): e.g. /sensing, /control, etc.\n",
@ -109,9 +101,9 @@
"DFG_INPUT_NODE_PATTERNS = [r\"^/sensing\"]\n",
"# RegEx pattern for nodes that shall be marked as system outputs\n",
"# These will be plotted with a double border\n",
"DFG_OUTPUT_NODE_PATTERNS = [r\"^/awapi\", r\"^/control/external_cmd_converter\", \"emergency\"]\n",
"DFG_OUTPUT_NODE_PATTERNS = [r\"^/control/external_cmd_converter\"]\n",
"# RegEx for nodes which shall not be plotted in the DFG\n",
"DFG_EXCL_NODE_PATTERNS = [r\"^/rviz2\", r\"transform_listener_impl\"]\n",
"DFG_EXCL_NODE_PATTERNS = [r\"^/rviz2\"]\n",
"\n",
"# Whether to compute E2E latencies.\n",
"E2E_ENABLED = True\n",
@ -131,15 +123,48 @@
"E2E_OUTPUT_TOPIC_PATTERNS = [r\"^/control/command/control_cmd$\"]\n",
"# All topics containing any of these RegEx patterns are considered input topics in E2E latency calculations\n",
"# E.g. r\"^/sensing/\" will cover all sensing topics\n",
"E2E_INPUT_TOPIC_PATTERNS = [r\"^/sensing/.*?pointcloud\"]\n",
"E2E_INPUT_TOPIC_PATTERNS = [r\"pointcloud\"]\n",
"\n",
"# E2E paths are uniquely identified by a string like \"/topic/1 -> void(Node1)(args1) -> /topic/2 -> void(Node2)(args2) -> void(Node2)(args3) -> ...\".\n",
"# Certain patterns only occur in initial setup or in scenario switching and can be excluded via RegEx patterns here.\n",
"E2E_EXCL_PATH_PATTERNS = [r\"NDTScanMatcher\", r\"^/parameter_events\", \"hazard\", \"turn_indicator\", \"gear_cmd\", \"emergency_cmd\",\n",
" \"external_cmd\", \"/control/operation_mode\", \"/planning/scenario_planning/scenario$\"]\n",
"E2E_EXCL_PATH_PATTERNS = [r\"^/parameter_events\", \"hazard\", \"turn_indicator\", \"gear_cmd\", \"emergency_cmd\", \"emergency_state\",\n",
" \"external_cmd\", \"/control/operation_mode\"]\n",
"\n",
"# To specify paths of interest, topic/callback name patterns that HAVE TO OCCUR in each E2E path can be specified as RegEx here.\n",
"E2E_INCL_PATH_PATTERNS = [\"BehaviorPathPlanner\", \"BehaviorVelocityPlanner\", \"pointcloud_preprocessor::Filter\", r\"^/sensing/.*?pointcloud\"]\n",
"#E2E_INCL_PATH_PATTERNS = [\"BehaviorPathPlanner\", \"BehaviorVelocityPlanner\", \"pointcloud_preprocessor::Filter\", r\"^/sensing/.*?pointcloud\"]\n",
"E2E_INCL_PATH_PATTERNS = []\n",
"\n",
"# If an exact path through the system is known, this variabe can be set to a list (order matters!) of all elements on the path.\n",
"# The first item ist the one at the \"input end\" of the system, the last one the \"output end\".\n",
"E2E_EXACT_PATH = [\n",
" \"void(pointcloud_preprocessor::Filter)(sensor_msgs::msg::PointCloud2,pcl_msgs::msg::PointIndices)\",\n",
" \"/localization/util/downsample/pointcloud\",\n",
" \"void(NDTScanMatcher)(sensor_msgs::msg::PointCloud2)\",\n",
" \"/localization/pose_estimator/pose_with_covariance\",\n",
" \"void(EKFLocalizer)(geometry_msgs::msg::PoseWithCovarianceStamped)\",\n",
" \"void(EKFLocalizer)()\",\n",
" \"/localization/pose_twist_fusion_filter/kinematic_state\",\n",
" \"void(StopFilter)(nav_msgs::msg::Odometry)\",\n",
" \"/localization/kinematic_state\",\n",
" \"void(behavior_path_planner::BehaviorPathPlannerNode)(nav_msgs::msg::Odometry)\",\n",
" \"void(behavior_path_planner::BehaviorPathPlannerNode)()\",\n",
" \"/planning/scenario_planning/lane_driving/behavior_planning/path_with_lane_id\",\n",
" \"void(behavior_velocity_planner::BehaviorVelocityPlannerNode)(autoware_auto_planning_msgs::msg::PathWithLaneId)\",\n",
" \"/planning/scenario_planning/lane_driving/behavior_planning/path\",\n",
" \"void(ObstacleAvoidancePlanner)(autoware_auto_planning_msgs::msg::Path)\",\n",
" \"/planning/scenario_planning/lane_driving/motion_planning/obstacle_avoidance_planner/trajectory\",\n",
" \"void(motion_planning::ObstacleStopPlannerNode)(autoware_auto_planning_msgs::msg::Trajectory)\",\n",
" \"/planning/scenario_planning/lane_driving/trajectory\",\n",
" \"void(ScenarioSelectorNode)(autoware_auto_planning_msgs::msg::Trajectory)\",\n",
" \"/planning/scenario_planning/scenario_selector/trajectory\",\n",
" \"void(motion_velocity_smoother::MotionVelocitySmootherNode)(autoware_auto_planning_msgs::msg::Trajectory)\",\n",
" \"/planning/scenario_planning/trajectory\",\n",
" \"void(autoware::motion::control::trajectory_follower_nodes::Controller)(autoware_auto_planning_msgs::msg::Trajectory)\",\n",
" \"void(autoware::motion::control::trajectory_follower_nodes::Controller)()\",\n",
" \"/control/trajectory_follower/control_cmd\",\n",
" \"void(vehicle_cmd_gate::VehicleCmdGate)(autoware_auto_control_msgs::msg::AckermannControlCommand)\",\n",
" \"/control/command/control_cmd\",\n",
"]\n",
"\n",
"# For development purposes only. Leave this at False.\n",
"DEBUG = False\n",
@ -181,6 +206,8 @@
"for k, v in globals().copy().items():\n",
" if not k.isupper():\n",
" continue\n",
" if isinstance(v, Iterable) and not isinstance(v, str):\n",
" v = (\"\\n \" + (\" \" * 44)).join(list(map(str, v)))\n",
" print(f\" {k:.<40s} := {v}\")"
],
"metadata": {
@ -235,7 +262,17 @@
" \"callback_objects\", \"callback_symbols\", \"publish_instances\", \"callback_instances\", \"topics\"]\n",
"\n",
"# Help the IDE recognize those identifiers by assigning a dummy value to their name.\n",
"nodes = publishers = subscriptions = timers = timer_node_links = subscription_objects = callback_objects = callback_symbols = publish_instances = callback_instances = topics = None\n",
"nodes: Index = Index([])\n",
"publishers: Index = Index([])\n",
"subscriptions: Index = Index([])\n",
"timers: Index = Index([])\n",
"timer_node_links: Index = Index([])\n",
"subscription_objects: Index = Index([])\n",
"callback_objects: Index = Index([])\n",
"callback_symbols: Index = Index([])\n",
"publish_instances: Index = Index([])\n",
"callback_instances: Index = Index([])\n",
"topics: Index = Index([])\n",
"\n",
"for name in _tr_globals:\n",
" globals()[name] = getattr(_tracing_context, name)\n",
@ -246,6 +283,67 @@
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"##################################################\n",
"# Split /tf into Artificial Topics\n",
"##################################################\n",
"# /tf is one ROS2 topic but carries multiple\n",
"# different streams of information:\n",
"# One for each publisher of the /tf topic.\n",
"# We can separate the messages on /tf according\n",
"# to those publishers.\n",
"# Still, all subscribers to /tf are subscribed\n",
"# to ALL of those split topics, so the\n",
"# found dependencies are still a vast over-\n",
"# approximation of the actual dataflow.\n",
"##################################################\n",
"\n",
"tf_topic: TrTopic = topics.by_name[\"/tf\"]\n",
"\n",
"tf_split_topics = []\n",
"for pub in tf_topic.publishers:\n",
" pub: TrPublisher\n",
" topic = TrTopic(f\"/tf[{pub.node.path}_{pub.id}]\", _tracing_context)\n",
" pub.topic_name = topic.name\n",
" for sub in tf_topic.subscriptions:\n",
" sub.topic_names.append(topic.name)\n",
"\n",
" tf_split_topics.append(topic)\n",
"\n",
"print(\"Rebuilding subscription index\")\n",
"subscriptions.rebuild()\n",
"print(\"Rebuilding publisher index\")\n",
"publishers.rebuild()\n",
"print(\"Adding new topics and rebuilding topic index\")\n",
"topics.append(tf_split_topics)"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"print(\"Artificial TF topics (/tf[...]):\")\n",
"for t in tf_split_topics:\n",
" print(f\"{t.name:.<110s} | {len(t.publishers)} pubs, {sum(map(lambda p: len(p.instances), t.publishers))}\")\n",
" for sub in t.subscriptions:\n",
" print(f\" {sub.node.path:.<106s} | {len(sub.subscription_object.callback_object.callback_instances)} {len(sub.node.publishers)} {sum(map(lambda p: len(p.instances), sub.node.publishers))}\")\n",
" #prefix = os.path.split(sub.node.path)[0]\n",
" #for node in nodes:\n",
" # if node.path.startswith(prefix) and node.path.removeprefix(prefix).count(\"/\") <= 1 and \"transform_listener_impl\" not in node.path:\n",
" # print(f\" -> {' ' * (len(prefix) - 3)}{node.path.removeprefix(prefix)}\")"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
@ -257,19 +355,19 @@
"\n",
"for topic in sorted(topics, key=lambda t: t.name):\n",
" topic: TrTopic\n",
" print(f\"{topic.name:.<120s} | {sum(map(lambda p: len(p.instances), topic.publishers))}\")\n",
" print(f\"{topic.name:.<130s} | {sum(map(lambda p: len(p.instances), topic.publishers)):>5d} msgs\")\n",
"\n",
"print(\"\\n[DEBUG] INPUT TOPICS\")\n",
"for t in sorted(topics, key=lambda t: t.name):\n",
" for f in E2E_INPUT_TOPIC_PATTERNS:\n",
" if re.search(f, t.name):\n",
" print(f\"--[DEBUG] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers))}\")\n",
" print(f\"--[DEBUG] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers)):>5d} msgs\")\n",
"\n",
"print(\"\\n[DEBUG] OUTPUT TOPICS\")\n",
"for t in sorted(topics, key=lambda t: t.name):\n",
" for f in E2E_OUTPUT_TOPIC_PATTERNS:\n",
" if re.search(f, t.name):\n",
" print(f\"--[ENTKÄFERN] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers))}\")"
" print(f\"--[DEBUG] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers)):>5d} msgs\")"
],
"metadata": {
"collapsed": false
@ -327,7 +425,7 @@
"\n",
"from latency_graph.latency_graph_plots import plot_latency_graph_full\n",
"\n",
"plot_latency_graph_full(lat_graph, _tracing_context, \"latency_graph_full\")"
"plot_latency_graph_full(lat_graph, _tracing_context, os.path.join(OUT_PATH, \"latency_graph_full\"))"
],
"metadata": {
"collapsed": false
@ -358,7 +456,7 @@
"\n",
"from latency_graph.latency_graph_plots import plot_latency_graph_overview\n",
"\n",
"plot_latency_graph_overview(lat_graph, DFG_EXCL_NODE_PATTERNS, DFG_INPUT_NODE_PATTERNS, DFG_OUTPUT_NODE_PATTERNS, DFG_MAX_HIER_LEVEL, \"latency_graph_overview\")"
"plot_latency_graph_overview(lat_graph, DFG_EXCL_NODE_PATTERNS, DFG_INPUT_NODE_PATTERNS, DFG_OUTPUT_NODE_PATTERNS, DFG_MAX_HIER_LEVEL, os.path.join(OUT_PATH, \"latency_graph_overview\"))"
],
"metadata": {
"collapsed": false,
@ -389,7 +487,7 @@
"end_topics = [t for t in _tracing_context.topics if any(re.search(f, t.name) for f in E2E_OUTPUT_TOPIC_PATTERNS)]\n",
"\n",
"def _build_dep_trees():\n",
" return build_dep_trees(end_topics, lat_graph, _tracing_context, E2E_EXCL_PATH_PATTERNS, E2E_TIME_LIMIT_S)\n",
" return build_dep_trees(end_topics, lat_graph, _tracing_context, E2E_EXCL_PATH_PATTERNS, E2E_TIME_LIMIT_S, exact_path=E2E_EXACT_PATH)\n",
"\n",
"try:\n",
" trees = cached(\"trees\", _build_dep_trees, [TR_PATH], not CACHING_ENABLED)\n",
@ -425,21 +523,6 @@
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"%%skip_if_false DEBUG\n",
"\n",
"import pickle\n",
"with open(\"trees.pkl\", \"rb\") as f:\n",
" trees = pickle.load(f)"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
@ -456,9 +539,7 @@
"\n",
"trees_paths = [e2e_paths_sorted_desc(tree, E2E_INPUT_TOPIC_PATTERNS) for tree in tqdm(trees, mininterval=10.0,\n",
" desc=\"Extracting E2E paths\")]\n",
"all_paths = [p for paths in trees_paths for p in paths]\n",
"relevant_paths = [p for p in all_paths\n",
" if any(map(lambda inst: re.search(\"^/sensing/.*?pointcloud\", owner(inst)), p))]\n"
"all_paths = [p for paths in trees_paths for p in paths]"
],
"metadata": {
"collapsed": false
@ -477,7 +558,7 @@
"# Group dataflows by DFG path\n",
"##################################################\n",
"\n",
"cohorts = aggregate_e2e_paths(relevant_paths) #all_paths)\n",
"cohorts = aggregate_e2e_paths(all_paths) #all_paths)\n",
"cohort_pairs = [(k, v) for k, v in cohorts.items()]\n",
"cohort_pairs.sort(key=lambda kv: len(kv[1]), reverse=True)\n",
"\n",
@ -490,7 +571,7 @@
"out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\", index=False)\n",
"\n",
"df_print = out_df[['path', 'e2e_latency']].groupby(\"path\").agg(['count', 'mean', 'min', 'max']).reset_index()\n",
"df_print['path'] = df_print['path'].apply(lambda path: \" -> \".join(filter(lambda part: part.startswith(\"/\"), path.split(\" -> \"))))\n",
"#df_print['path'] = df_print['path'].apply(lambda path: \" -> \".join(filter(lambda part: part.startswith(\"/\"), path.split(\" -> \"))))\n",
"df_print = df_print.sort_values((\"e2e_latency\", \"count\"), ascending=False)\n",
"df_print.to_csv(os.path.join(OUT_PATH, \"e2e_overview.csv\"), sep=\"\\t\", index=False)\n",
"df_print"
@ -504,13 +585,11 @@
"execution_count": null,
"outputs": [],
"source": [
"%%skip_if_false MANUAL_CACHE\n",
"\n",
"import pickle\n",
"# with open(\"state.pkl\", \"wb\") as f:\n",
"# pickle.dump((trees_paths, all_paths, lidar_paths, cohorts), f)\n",
"with open(\"state.pkl\", \"rb\") as f:\n",
" (trees_paths, all_paths, relevant_paths, cohorts) = pickle.load(f)"
"# pickle.dump((trees_paths, all_paths, cohorts), f)\n",
"with open(os.path.join(OUT_PATH, \"state.pkl\"), \"wb\") as f:\n",
" pickle.dump((trees_paths, all_paths, cohorts), f)"
],
"metadata": {
"collapsed": false
@ -526,8 +605,13 @@
"# cannot-be-included patterns\n",
"##################################################\n",
"\n",
"#for k in cohorts.keys():\n",
"# print(\" \" + \"\\n-> \".join(k.split(\" -> \")))\n",
"# print()\n",
"# print()\n",
"\n",
"cohorts_filt = {k: v for k, v in cohorts.items()\n",
" if not any(re.search(f, k) for f in E2E_EXCL_PATH_PATTERNS) and all(re.search(f, k) for f in E2E_INCL_PATH_PATTERNS)}\n",
" if all(re.search(f.removeprefix(\"^\").removesuffix(\"$\"), k) for f in E2E_INCL_PATH_PATTERNS)}\n",
"\n",
"\n",
"print(len(cohorts), len(cohorts_filt))\n",
@ -557,15 +641,15 @@
"##################################################\n",
"\n",
"e2e_breakdowns = list(map(e2e_latency_breakdown, relevant_dataflows))\n",
"filt = [(path, bdown) for (path, bdown) in zip(relevant_dataflows, e2e_breakdowns)\n",
" if not any(True for item in bdown\n",
" if item.type == \"idle\" and item.duration > item.location[1].callback_obj.owner.period * 1e-9)]\n",
"#filt = [(path, bdown) for (path, bdown) in zip(relevant_dataflows, e2e_breakdowns)\n",
"# if not any(True for item in bdown\n",
"# if item.type == \"idle\" and item.duration > item.location[1].callback_obj.owner.period * 1e-9)]\n",
"\n",
"# Backup for debug purposes.\n",
"lidar_cohort_orig = relevant_dataflows\n",
"e2e_breakdowns_orig = e2e_breakdowns\n",
"#relevant_dataflows_orig = relevant_dataflows\n",
"#e2e_breakdowns_orig = e2e_breakdowns\n",
"\n",
"relevant_dataflows, e2e_breakdowns = zip(*filt)"
"#relevant_dataflows, e2e_breakdowns = zip(*filt)"
],
"metadata": {
"collapsed": false
@ -586,7 +670,7 @@
"from message_tree.message_tree_algorithms import e2e_latency_breakdown\n",
"\n",
"conv_items = [i for p in e2e_breakdowns for i in p]\n",
"with open(\"out/plot_e2es_path.txt\", \"w\") as f:\n",
"with open(os.path.join(OUT_PATH, \"plot_e2es_path.txt\"), \"w\") as f:\n",
" f.write(f\"Number of path instances: {len(relevant_dataflows)}\\n\")\n",
" f.write( \" \" + \"\\n -> \".join(relevant_path.split(\" -> \")))\n",
" f.write(\"\\n\")\n",
@ -609,7 +693,7 @@
" durations = [item.duration for item in items if item.type == type]\n",
"\n",
" df = pd.Series(durations)\n",
" df.to_csv(f\"out/plot_e2es_{type}_portion.csv\", header=[f\"e2e_latency_{type}_portion_s\"], index=False)\n",
" df.to_csv(os.path.join(OUT_PATH, f\"plot_e2es_{type}_portion.csv\"), header=[f\"e2e_latency_{type}_portion_s\"], index=False)\n",
"\n",
" ax.set_title(type)\n",
" ax.hist(durations, bins=50)\n",
@ -626,7 +710,7 @@
"##################################################\n",
"\n",
"fig = e2e_breakdown_type_hist__(conv_items_unique)\n",
"plt.savefig(\"out/plot_e2e_portions.png\")\n",
"plt.savefig(os.path.join(OUT_PATH, \"plot_e2e_portions.png\"))\n",
"\n",
"None\n"
],
@ -649,7 +733,7 @@
"e2es = [path[-1].timestamp - path[0].timestamp for path in relevant_dataflows]\n",
"\n",
"df = pd.Series(e2es)\n",
"df.to_csv(\"out/plot_e2es.csv\", index=False, header=[\"e2e_latency_s\"])\n",
"df.to_csv(os.path.join(OUT_PATH, \"plot_e2es.csv\"), index=False, header=[\"e2e_latency_s\"])\n",
"\n",
"plt.close(\"E2E histogram\")\n",
"fig, ax = plt.subplots(num=\"E2E histogram\", dpi=300, figsize=(16, 9))\n",
@ -661,7 +745,7 @@
"ax.axvline(np.mean(e2es), c=\"red\", linewidth=2)\n",
"_, max_ylim = ax.get_ylim()\n",
"ax.text(np.mean(e2es) * 1.02, max_ylim * 0.98, 'Mean: {:.3f}s'.format(np.mean(e2es)))\n",
"plt.savefig(\"out/plot_e2es.png\")\n",
"plt.savefig(os.path.join(OUT_PATH, \"plot_e2es.png\"))\n",
"None"
],
"metadata": {
@ -709,17 +793,17 @@
" add_label(vln, type)\n",
" for i, x in zip(indices, xs):\n",
" df_out = pd.Series(x)\n",
" df_out.to_csv(f\"out/plot_e2es_violin_{i:02d}.csv\", index=False, header=[\"duration_s\"])\n",
" df_out.to_csv(os.path.join(OUT_PATH, f\"plot_e2es_violin_{i:02d}.csv\"), index=False, header=[\"duration_s\"])\n",
"ax.set_ylabel(\"Latency contribution [s]\")\n",
"ax.set_xticks(range(len(labels)), labels, rotation=90)\n",
"ax.legend(*zip(*legend_entries))\n",
"plt.savefig(\"out/plot_e2es_violin.png\")\n",
"plt.savefig(os.path.join(OUT_PATH, \"plot_e2es_violin.png\"))\n",
"\n",
"df_labels = pd.Series(labels)\n",
"df_labels.to_csv(\"out/plot_e2es_violin_labels.csv\", index=False, header=[\"label\"])\n",
"df_labels.to_csv(os.path.join(OUT_PATH, \"plot_e2es_violin_labels.csv\"), index=False, header=[\"label\"])\n",
"\n",
"df_types = pd.Series(types)\n",
"df_types.to_csv(\"out/plot_e2es_violin_types.csv\", index=False, header=[\"type\"])\n",
"df_types.to_csv(os.path.join(OUT_PATH, \"plot_e2es_violin_types.csv\"), index=False, header=[\"type\"])\n",
"\n",
"None"
],
@ -737,7 +821,7 @@
" continue\n",
" dur_ts_records = [(item.location[0].timestamp, item.duration) for item in concat_pc_items]\n",
" df_dur_ts = pd.DataFrame(dur_ts_records, columns=(\"timestamp\", \"duration\"))\n",
" df_dur_ts.to_csv(f\"dur_ts_{concat_pc_items[0].location[0].publisher.topic_name.replace('/', '__')}.csv\", index=False)"
" df_dur_ts.to_csv(os.path.join(OUT_PATH, f\"dur_ts_{concat_pc_items[0].location[0].publisher.topic_name.replace('/', '__')}.csv\"), index=False)"
],
"metadata": {
"collapsed": false
@ -754,7 +838,7 @@
"\n",
"records = [(lbl, x.mean(), x.std(), x.min(), x.max()) for x, lbl in zip(xs, lbls)]\n",
"df_cpu = pd.DataFrame(records, columns=[\"callback\", \"mean\", \"std\", \"min\", \"max\"])\n",
"df_cpu.to_csv(\"out/calc_times.csv\", index=False)"
"df_cpu.to_csv(os.path.join(OUT_PATH, \"calc_times.csv\"), index=False)"
],
"metadata": {
"collapsed": false
@ -784,33 +868,7 @@
"execution_count": null,
"outputs": [],
"source": [
"%%skip_if_false E2E_ENABLED\n",
"%%skip_if_false DEBUG\n",
"\n",
"import pickle\n",
"\n",
"with open(\"trees.pkl\", \"wb\") as f:\n",
" pickle.dump(trees, f)"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"ctx = relevant_paths[0][0]._c\n",
"topics = ctx.topics\n",
"\n",
"topics_of_interest = [item.publisher.topic_name for item in relevant_dataflows[0] if isinstance(item, TrPublishInstance)]\n",
"\n",
"topic_records = []\n",
"for t in topics_of_interest:\n",
" topic_records.append((t, topics.by_name[t].subscriptions[0]))\n",
"\n",
"print(topic_records)"
"print(\"Done.\")"
],
"metadata": {
"collapsed": false

View file

@ -105,8 +105,11 @@ class TrContext:
id=False)
self.publishers = Index(df_to_type_list(handler.data.rcl_publishers, TrPublisher, _c=self),
id=False, node_handle=True, topic_name=True)
self.subscriptions = Index(df_to_type_list(handler.data.rcl_subscriptions, TrSubscription, _c=self),
id=False, node_handle=True, topic_name=True)
self.subscriptions = Index(df_to_type_list(handler.data.rcl_subscriptions, TrSubscription,
column_value_mappers={"topic_name": lambda n: [n]},
column_to_field_mappings={"topic_name": "topic_names"},
_c=self),
id=False, node_handle=True, topic_names='n-to-m')
self.timers = Index(df_to_type_list(handler.data.timers, TrTimer, _c=self),
id=False)
self.timer_node_links = Index(df_to_type_list(handler.data.timer_node_links, TrTimerNodeLink),
@ -119,15 +122,15 @@ class TrContext:
self.callback_symbols = Index(df_to_type_list(handler.data.callback_symbols, TrCallbackSymbol, _c=self),
id=False)
self.publish_instances = Index(df_to_type_list(handler.data.rcl_publish_instances, TrPublishInstance, _c=self,
mappers={"timestamp": lambda t: t * 1e-9}),
column_value_mappers={"timestamp": lambda t: t * 1e-9}),
publisher_handle=True)
self.callback_instances = Index(df_to_type_list(handler.data.callback_instances, TrCallbackInstance, _c=self,
mappers={"timestamp": lambda t: t.timestamp(),
column_value_mappers={"timestamp": lambda t: t.timestamp(),
"duration": lambda d: d.total_seconds()}),
callback_object=True)
_unique_topic_names = {*(pub.topic_name for pub in self.publishers),
*(sub.topic_name for sub in self.subscriptions)}
*(n for sub in self.subscriptions for n in sub.topic_names)}
self.topics = Index((TrTopic(name=name, _c=self) for name in _unique_topic_names),
name=False)
@ -195,7 +198,7 @@ class TrPublisher:
@property
def subscriptions(self) -> List['TrSubscription']:
return self._c.subscriptions.by_topic_name.get(self.topic_name) or []
return self._c.subscriptions.by_topic_names.get(self.topic_name) or []
@property
def instances(self) -> List['TrPublishInstance']:
@ -218,7 +221,7 @@ class TrSubscription:
timestamp: int
node_handle: int
rmw_handle: int
topic_name: str
topic_names: List[str]
depth: int
_c: TrContext = field(repr=False)
@ -228,15 +231,15 @@ class TrSubscription:
@property
def publishers(self) -> List['TrPublisher']:
return self._c.publishers.by_topic_name.get(self.topic_name) or []
return list(set(p for n in self.topic_names for p in self._c.publishers.by_topic_name.get(n)))
@property
def subscription_object(self) -> Optional['TrSubscriptionObject']:
return self._c.subscription_objects.by_subscription_handle.get(self.id)
@property
def topic(self) -> Optional['TrTopic']:
return self._c.topics.by_name.get(self.topic_name)
def topics(self) -> List['TrTopic']:
return list(set(self._c.topics.by_name.get(n) for n in self.topic_names))
def __hash__(self):
return hash(self.id)
@ -417,7 +420,7 @@ class TrTopic:
@property
def subscriptions(self) -> List['TrSubscription']:
return self._c.subscriptions.by_topic_name.get(self.name) or []
return self._c.subscriptions.by_topic_names.get(self.name) or []
def __hash__(self):
return hash(self.name)