{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"import os\n",
"import sys\n",
"import re\n",
"from tqdm import tqdm\n",
"from typing import Iterable\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
"from matplotlib import pyplot as plt\n",
"\n",
"from misc.utils import cached, parse_as\n",
"\n",
"# TODO: This has no effect for some reason (goal is to make graphs in notebook large and hi-res)\n",
"plt.rcParams['figure.dpi'] = 300\n",
"\n",
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"# User Settings"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"User Settings:\n",
" TRACING_WS_BUILD_PATH................... := /home/niklas/dataflow-analysis/dependencies/build\n",
" TR_PATH................................. := /home/niklas/dataflow-analysis/traces/full_topology_tracing-20250603152153/ust\n",
" OUT_PATH................................ := /home/niklas/dataflow-analysis/out\n",
" CACHING_ENABLED......................... := False\n",
" BW_ENABLED.............................. := False\n",
" BW_PATH................................. := /home/niklas/dataflow-analysis/path/to/messages.h5\n",
" CL_ENABLED.............................. := False\n",
" CL_PATH................................. := /path/to/code_analysis/output\n",
" DFG_ENABLED............................. := True\n",
" DFG_PLOT................................ := True\n",
" DFG_MAX_HIER_LEVEL...................... := 100\n",
" DFG_INPUT_NODE_PATTERNS................. := ^/input_\n",
" DFG_OUTPUT_NODE_PATTERNS................ := ^/output_\n",
" DFG_EXCL_NODE_PATTERNS.................. := ^/rviz2\n",
" E2E_ENABLED............................. := True\n",
" E2E_PLOT................................ := True\n",
" E2E_PLOT_TIMESTAMP...................... := 100\n",
" E2E_TIME_LIMIT_S........................ := 500\n",
" E2E_OUTPUT_TOPIC_PATTERNS............... := ^/output/\n",
" E2E_INPUT_TOPIC_PATTERNS................ := ^/input/\n",
" E2E_EXCL_PATH_PATTERNS.................. := ^/parameter_events\n",
" E2E_INCL_PATH_PATTERNS.................. := \n",
" E2E_EXACT_PATH.......................... := \n",
" DEBUG................................... := False\n",
" MANUAL_CACHE............................ := False\n"
]
}
],
"source": [
"##################################################\n",
"# User Settings\n",
"##################################################\n",
"# Change these to influence the execution of the\n",
"# notebook.\n",
"# You can override these from the command line\n",
"# by defining environment variables with the\n",
"# name of the constants below, prefixed with\n",
"# \"ANA_NB_\".\n",
"# For example, the environment variable\n",
"# \"ANA_NB_TR_PATH\" will override the \"TR_PATH\"\n",
"# setting below.\n",
"##################################################\n",
"\n",
"# The path to the build folder of a ROS2 workspace that contains the\n",
"# tracetools_read and tracetools_analysis folders.\n",
"TRACING_WS_BUILD_PATH = \"/home/niklas/dataflow-analysis/dependencies/build\"\n",
"\n",
"# Path to trace directory (e.g. ~/.ros/my-trace/ust) or to a converted trace file.\n",
"# Using the path \"/ust\" at the end is optional but greatly reduces processing time\n",
"# if kernel traces are also present.\n",
"TR_PATH = \"/home/niklas/dataflow-analysis/traces/full_topology_tracing-20250603152153/ust\"\n",
"\n",
"# Path to the folder all artifacts from this notebook are saved to.\n",
"# This entails plots as well as data tables.\n",
"OUT_PATH = \"out/\"\n",
"\n",
"# Whether to cache the results of long computations per set of inputs\n",
"CACHING_ENABLED = False\n",
"\n",
"# Whether to annotate topics/publications with bandwidth/message size\n",
"BW_ENABLED = False\n",
"# Path to a HDF5 file as output by ma-hw-perf-tools/messages/record.bash\n",
"# Used to annotate message sizes in E2E latency calculations\n",
"BW_PATH = \"path/to/messages.h5\"\n",
"\n",
"# Whether to use dependencies extracted by the Clang-tools to supplement\n",
"# automatic node-internal data flow annotations.\n",
"# If in doubt, set to False.\n",
"CL_ENABLED = False\n",
"# Path to the output directory of the ROS2 dependency checker.\n",
"# Will only be used if CL_ENABLED is True.\n",
"CL_PATH = \"/path/to/code_analysis/output\"\n",
"\n",
"# Whether to compute data flow graphs.\n",
"# If you are only interested in E2E latencies, set this to False\n",
"DFG_ENABLED = True\n",
"# Whether to plot data flow graphs (ignored if DFG_ENABLED is False)\n",
"DFG_PLOT = True\n",
"\n",
"# The maximum node namespace hierarchy level to be plotted.\n",
"# Top-level (1): e.g. /sensing, /control, etc.\n",
"# Level 3: e.g. /sensing/lidar/pointcloud_processor\n",
"DFG_MAX_HIER_LEVEL = 100\n",
"\n",
"# RegEx pattern for nodes that shall be marked as system inputs\n",
"# These will be plotted with a start arrow as known from automata diagrams\n",
"DFG_INPUT_NODE_PATTERNS = [r\"^/input_\"]\n",
"# RegEx pattern for nodes that shall be marked as system outputs\n",
"# These will be plotted with a double border\n",
"DFG_OUTPUT_NODE_PATTERNS = [r\"^/output_\"]\n",
"# RegEx for nodes which shall not be plotted in the DFG\n",
"DFG_EXCL_NODE_PATTERNS = [r\"^/rviz2\"]\n",
"\n",
"# Whether to compute E2E latencies.\n",
"E2E_ENABLED = True\n",
"# Whether to plot end-to-end latency information (ignored if E2E_ENABLED is False)\n",
"E2E_PLOT = True\n",
"# The index of the output message that shall be used in plots that visualize a specific\n",
"# message dependency tree. This index has to be 0 <= n < #output messages\n",
"E2E_PLOT_TIMESTAMP = 100\n",
"# E2E latency threshold. Every E2E latency higher than this is discarded.\n",
"# Set this as low as comfortably possible to speed up calculations.\n",
"# WARNING: If you set this too low (i.e. to E2E latencies that plausibly can happen)\n",
"# your results will be wrong)\n",
"E2E_TIME_LIMIT_S = 500\n",
"\n",
"# All topics containing any of these RegEx patterns are considered output topics in E2E latency calculations\n",
"# E.g. r\"^/control/\" will cover all control topics\n",
"E2E_OUTPUT_TOPIC_PATTERNS = [r\"^/output/\"]\n",
"# All topics containing any of these RegEx patterns are considered input topics in E2E latency calculations\n",
"# E.g. r\"^/sensing/\" will cover all sensing topics\n",
"E2E_INPUT_TOPIC_PATTERNS = [r\"^/input/\"]\n",
"\n",
"# E2E paths are uniquely identified by a string like \"/topic/1 -> void(Node1)(args1) -> /topic/2 -> void(Node2)(args2) -> void(Node2)(args3) -> ...\".\n",
"# Certain patterns only occur in initial setup or in scenario switching and can be excluded via RegEx patterns here.\n",
"E2E_EXCL_PATH_PATTERNS = [r\"^/parameter_events\"]\n",
"\n",
"# To specify paths of interest, topic/callback name patterns that HAVE TO OCCUR in each E2E path can be specified as RegEx here.\n",
"#E2E_INCL_PATH_PATTERNS = [\"BehaviorPathPlanner\", \"BehaviorVelocityPlanner\", \"pointcloud_preprocessor::Filter\", r\"^/sensing/.*?pointcloud\"]\n",
"E2E_INCL_PATH_PATTERNS = []\n",
"\n",
"# If an exact path through the system is known, this variabe can be set to a list (order matters!) of all elements on the path.\n",
"# The first item ist the one at the \"input end\" of the system, the last one the \"output end\".\n",
"E2E_EXACT_PATH = []\n",
"\n",
"# For development purposes only. Leave this at False.\n",
"DEBUG = False\n",
"\n",
"# For development purposes only. Leave this at False.\n",
"MANUAL_CACHE = False\n",
"\n",
"##################################################\n",
"# End of User Settings\n",
"##################################################\n",
"\n",
"# This code overrides the above constants with environment variables, with values from the environment.\n",
"# Values will be parsed through Python's literal_eval() function. Thus, strings have to be specified\n",
"# including \" or '.\n",
"for env_key, env_value in os.environ.items():\n",
" if env_key.startswith(\"ANA_NB_\"):\n",
" key = env_key.removeprefix(\"ANA_NB_\")\n",
" if key not in globals().keys():\n",
" continue\n",
" value = parse_as(type(globals()[key]), env_value)\n",
" globals()[key] = value\n",
"\n",
"\n",
"# Convert input paths to absolute paths\n",
"def _expand_path(path):\n",
" return os.path.realpath(os.path.expandvars(os.path.expanduser(path)))\n",
"\n",
"\n",
"TRACING_WS_BUILD_PATH = _expand_path(TRACING_WS_BUILD_PATH)\n",
"TR_PATH = _expand_path(TR_PATH)\n",
"OUT_PATH = _expand_path(OUT_PATH)\n",
"BW_PATH = _expand_path(BW_PATH)\n",
"CL_PATH = _expand_path(CL_PATH)\n",
"\n",
"os.makedirs(OUT_PATH, exist_ok=True)\n",
"\n",
"# Print parsed user settings\n",
"print(\"User Settings:\")\n",
"for k, v in globals().copy().items():\n",
" if not k.isupper():\n",
" continue\n",
" if isinstance(v, Iterable) and not isinstance(v, str):\n",
" v = (\"\\n \" + (\" \" * 44)).join(list(map(str, v)))\n",
" print(f\" {k:.<40s} := {v}\")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# The last few imports can only be resolved using the user-specified paths above\n",
"sys.path.append(os.path.join(TRACING_WS_BUILD_PATH, \"tracetools_read/\"))\n",
"sys.path.append(os.path.join(TRACING_WS_BUILD_PATH, \"tracetools_analysis/\"))\n",
"# noinspection PyUnresolvedReferences\n",
"from tracetools_read.trace import *\n",
"# noinspection PyUnresolvedReferences\n",
"from tracetools_analysis.loading import load_file\n",
"# noinspection PyUnresolvedReferences\n",
"from tracetools_analysis.processor.ros2 import Ros2Handler\n",
"\n",
"from tracing_interop.tr_types import *"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"# Load Trace Data\n",
"\n",
"Load (and, if necessary, convert) tracing data obtained through ros2_tracing. Build indices for fast analysis."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[CACHE] Cache disabled for tr_objects.\n",
"found converted file: /home/niklas/dataflow-analysis/traces/full_topology_tracing-20250603152153/ust/converted\n",
" [100%] [Ros2Handler]\n",
"[TrContext] Processing ROS 2 objects from traces...\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" ├─ Processing TrNodes: 100%|██████████| 16/16 [00:00<00:00, 189573.06it/s]\n",
" ├─ Processing TrPublishers: 100%|██████████| 48/48 [00:00<00:00, 352585.98it/s]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"[DEBUG] Duplicate Indices in id\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" ├─ Processing TrSubscriptions: 100%|██████████| 31/31 [00:00<00:00, 183131.58it/s]\n",
" ├─ Processing TrTimers: 100%|██████████| 6/6 [00:00<00:00, 92521.41it/s]\n",
" ├─ Processing TrTimerNodeLinks: 100%|██████████| 6/6 [00:00<00:00, 107088.61it/s]\n",
" ├─ Processing TrSubscriptionObjects: 100%|██████████| 31/31 [00:00<00:00, 331692.41it/s]\n",
" ├─ Processing TrCallbackObjects: 100%|██████████| 133/133 [00:00<00:00, 617082.34it/s]\n",
" ├─ Processing TrCallbackSymbols: 100%|██████████| 133/133 [00:00<00:00, 238394.20it/s]\n",
" ├─ Processing TrPublishInstances: 100%|██████████| 20207/20207 [00:00<00:00, 268376.27it/s]\n",
" ├─ Processing TrCallbackInstances: 100%|██████████| 20311/20311 [00:00<00:00, 293063.09it/s]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Done.\n"
]
}
],
"source": [
"def _load_traces():\n",
" file = load_file(TR_PATH)\n",
" handler = Ros2Handler.process(file)\n",
" return TrContext(handler)\n",
"\n",
"\n",
"_tracing_context = cached(\"tr_objects\", _load_traces, [TR_PATH], not CACHING_ENABLED)\n",
"_tr_globals = [\"nodes\", \"publishers\", \"subscriptions\", \"timers\", \"timer_node_links\", \"subscription_objects\",\n",
" \"callback_objects\", \"callback_symbols\", \"publish_instances\", \"callback_instances\", \"topics\"]\n",
"\n",
"# Help the IDE recognize those identifiers by assigning a dummy value to their name.\n",
"nodes: Index = Index([])\n",
"publishers: Index = Index([])\n",
"subscriptions: Index = Index([])\n",
"timers: Index = Index([])\n",
"timer_node_links: Index = Index([])\n",
"subscription_objects: Index = Index([])\n",
"callback_objects: Index = Index([])\n",
"callback_symbols: Index = Index([])\n",
"publish_instances: Index = Index([])\n",
"callback_instances: Index = Index([])\n",
"topics: Index = Index([])\n",
"\n",
"for name in _tr_globals:\n",
" globals()[name] = getattr(_tracing_context, name)\n",
"\n",
"print(\"Done.\")"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"/camera/debayered................................................................................................................. | 499 msgs\n",
"/camera/geometric................................................................................................................. | 499 msgs\n",
"/camera/radiometric............................................................................................................... | 499 msgs\n",
"/flight/plan...................................................................................................................... | 3596 msgs\n",
"/input/baro/alt................................................................................................................... | 601 msgs\n",
"/input/camera/raw................................................................................................................. | 499 msgs\n",
"/input/gps/fix.................................................................................................................... | 703 msgs\n",
"/input/imu/data................................................................................................................... | 499 msgs\n",
"/input/lidar/scan................................................................................................................. | 998 msgs\n",
"/input/operator/commands.......................................................................................................... | 798 msgs\n",
"/output/camera/mapped............................................................................................................. | 499 msgs\n",
"/output/classifier/classification................................................................................................. | 499 msgs\n",
"/output/flight/cmd................................................................................................................ | 3595 msgs\n",
"/output/telemetry/radio........................................................................................................... | 2302 msgs\n",
"/parameter_events................................................................................................................. | 16 msgs\n",
"/rosout........................................................................................................................... | 0 msgs\n",
"/sensors/fused.................................................................................................................... | 1803 msgs\n",
"/telemetry/data................................................................................................................... | 2302 msgs\n",
"\n",
"[DEBUG] INPUT TOPICS\n",
"--[DEBUG] ^/input/ :/input/baro/alt.......................................................................... | 601 msgs\n",
"--[DEBUG] ^/input/ :/input/camera/raw........................................................................ | 499 msgs\n",
"--[DEBUG] ^/input/ :/input/gps/fix........................................................................... | 703 msgs\n",
"--[DEBUG] ^/input/ :/input/imu/data.......................................................................... | 499 msgs\n",
"--[DEBUG] ^/input/ :/input/lidar/scan........................................................................ | 998 msgs\n",
"--[DEBUG] ^/input/ :/input/operator/commands................................................................. | 798 msgs\n",
"\n",
"[DEBUG] OUTPUT TOPICS\n",
"--[DEBUG] ^/output/ :/output/camera/mapped.................................................................... | 499 msgs\n",
"--[DEBUG] ^/output/ :/output/classifier/classification........................................................ | 499 msgs\n",
"--[DEBUG] ^/output/ :/output/flight/cmd....................................................................... | 3595 msgs\n",
"--[DEBUG] ^/output/ :/output/telemetry/radio.................................................................. | 2302 msgs\n"
]
}
],
"source": [
"##################################################\n",
"# Print (All/Input/Output) Topic Message Counts\n",
"##################################################\n",
"\n",
"for topic in sorted(topics, key=lambda t: t.name):\n",
" topic: TrTopic\n",
" print(f\"{topic.name:.<130s} | {sum(map(lambda p: len(p.instances), topic.publishers)):>5d} msgs\")\n",
"\n",
"print(\"\\n[DEBUG] INPUT TOPICS\")\n",
"for t in sorted(topics, key=lambda t: t.name):\n",
" for f in E2E_INPUT_TOPIC_PATTERNS:\n",
" if re.search(f, t.name):\n",
" print(f\"--[DEBUG] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers)):>5d} msgs\")\n",
"\n",
"print(\"\\n[DEBUG] OUTPUT TOPICS\")\n",
"for t in sorted(topics, key=lambda t: t.name):\n",
" for f in E2E_OUTPUT_TOPIC_PATTERNS:\n",
" if re.search(f, t.name):\n",
" print(f\"--[DEBUG] {f:<30s}:{t.name:.<89s} | {sum(map(lambda p: len(p.instances), t.publishers)):>5d} msgs\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"# Analyze ROS Graph\n",
"Reconstruct namespace hierarchy, data flow graph between callbacks."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[CACHE] Cache disabled for lat_graph.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Finding CB nodes: 100%|██████████| 133/133 [00:00<00:00, 299111.22it/s]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"[DEBUG] 96 callbacks have no owner, filtering them out.\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Processing node publications: 100%|██████████| 16/16 [00:00<00:00, 278.22it/s]\n",
"Processing CB subscriptions: 100%|██████████| 37/37 [00:00<00:00, 615.30it/s]\n",
"Building graph edges: 100%|██████████| 18/18 [00:00<00:00, 160975.42it/s]\n",
"Building graph nodes: 100%|██████████| 37/37 [00:00<00:00, 2422.60it/s]\n"
]
}
],
"source": [
"import latency_graph.latency_graph_structure as lg\n",
"\n",
"\n",
"def _make_latency_graph():\n",
" return lg.LatencyGraph(_tracing_context)\n",
"\n",
"\n",
"lat_graph = cached(\"lat_graph\", _make_latency_graph, [TR_PATH], not CACHING_ENABLED)"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"## Plot ROS Graph (full)\n",
"Plot the DFG hierarchically by node namespace. Plot each internal and external dependency between callbacks as one arrow."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" Processing INPUT: 1\n",
" Processing OUTPUT: 1\n",
" Processing input_camera_node: 2\n",
" Processing debayer_node: 2\n",
" Processing radiometric_node: 2\n",
" Processing geometric_node: 2\n",
" Processing output_mapping_node: 2\n",
" Processing output_smoke_classifier_node: 2\n",
" Processing input_gps_node: 2\n",
" Processing input_imu_node: 2\n",
" Processing input_baro_node: 2\n",
" Processing sensor_fusion_node: 4\n",
" Processing input_lidar_node: 2\n",
" Processing input_operator_cmd_node: 2\n",
" Processing flight_mgmt_node: 4\n",
" Processing output_flight_control_node: 2\n",
" Processing telemetry_node: 3\n",
" Processing output_radio_tx_node: 2\n"
]
},
{
"data": {
"image/svg+xml": [
"\n",
"\n",
"\n",
"\n",
"\n"
],
"text/plain": [
""
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%skip_if_false DFG_ENABLED\n",
"%%skip_if_false DFG_PLOT\n",
"\n",
"#################################################\n",
"# Plot full DFG, down to the callback level\n",
"#################################################\n",
"\n",
"from latency_graph.latency_graph_plots import plot_latency_graph_full\n",
"\n",
"plot_latency_graph_full(lat_graph, _tracing_context, os.path.join(OUT_PATH, \"latency_graph_full\"))"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"## Plot Latency Graph (overview)\n",
"Plot the DFG down to a certain hierarchy level in a flattened manner. Aggregate dependencies from multiple callbacks into corresponding node-node dependencies."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false,
"pycharm": {
"name": "#%%%%skip_if_false DFG_ENABLED\n"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Plotting DFG overview with max hierarchy level 100...\n",
"Input Node Patterns: ['^/input_']\n",
"Output Node Patterns: ['^/output_']\n",
"Excluded Node Patterns: ['^/rviz2']\n",
"Input Nodes: /input_camera_node, /input_gps_node, /input_imu_node, /input_baro_node, /input_lidar_node, /input_operator_cmd_node\n",
"Output Nodes: /output_mapping_node, /output_smoke_classifier_node, /output_flight_control_node, /output_radio_tx_node\n",
"Intermediate Nodes: /INPUT, /OUTPUT, /input_camera_node, /debayer_node, /radiometric_node, /geometric_node, /output_mapping_node, /output_smoke_classifier_node, /input_gps_node, /input_imu_node, /input_baro_node, /sensor_fusion_node, /input_lidar_node, /input_operator_cmd_node, /flight_mgmt_node, /output_flight_control_node, /telemetry_node, /output_radio_tx_node\n",
"/input_lidar_node /flight_mgmt_node 1\n",
"/input_gps_node /sensor_fusion_node 1\n",
"/input_operator_cmd_node /flight_mgmt_node 1\n",
"/debayer_node /radiometric_node 1\n",
"/radiometric_node /geometric_node 1\n",
"/radiometric_node /output_smoke_classifier_node 1\n",
"/input_baro_node /sensor_fusion_node 1\n",
"/flight_mgmt_node /output_flight_control_node 3\n",
"/input_camera_node /debayer_node 1\n",
"/output_mapping_node /telemetry_node 1\n",
"/telemetry_node /output_radio_tx_node 2\n",
"/sensor_fusion_node /telemetry_node 3\n",
"/sensor_fusion_node /flight_mgmt_node 3\n",
"/input_imu_node /sensor_fusion_node 1\n",
"/geometric_node /output_mapping_node 1\n"
]
},
{
"data": {
"image/svg+xml": [
"\n",
"\n",
"\n",
"\n",
"\n"
],
"text/plain": [
""
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%skip_if_false DFG_ENABLED\n",
"%%skip_if_false DFG_PLOT\n",
"\n",
"#################################################\n",
"# Plot overview of DFG, down to a certain\n",
"# hierarchy level\n",
"#################################################\n",
"\n",
"from latency_graph.latency_graph_plots import plot_latency_graph_overview\n",
"\n",
"print(f\"Plotting DFG overview with max hierarchy level {DFG_MAX_HIER_LEVEL}...\")\n",
"print(f\"Input Node Patterns: {DFG_INPUT_NODE_PATTERNS}\")\n",
"print(f\"Output Node Patterns: {DFG_OUTPUT_NODE_PATTERNS}\")\n",
"print(f\"Excluded Node Patterns: {DFG_EXCL_NODE_PATTERNS}\")\n",
"\n",
"plot_latency_graph_overview(lat_graph, DFG_EXCL_NODE_PATTERNS, DFG_INPUT_NODE_PATTERNS, DFG_OUTPUT_NODE_PATTERNS, DFG_MAX_HIER_LEVEL, os.path.join(OUT_PATH, \"latency_graph_overview\"))"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false
},
"source": [
"# Analyze Message Flow\n",
"Build dependency trees ending in the specified output topics."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Found 4 end topics for E2E latency calculations: /output/telemetry/radio, /output/camera/mapped, /output/classifier/classification, /output/flight/cmd\n",
"Using 133 callback objects, 20311 callback instances, and 20207 publish instances for E2E latency calculations.\n",
"[CACHE] Cache disabled for trees.\n",
"=====/output/telemetry/radio\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Processing output messages: 100%|██████████| 2302/2302 [00:11<00:00, 203.94it/s]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Found 2302 trees for topic /output/telemetry/radio\n",
"=====/output/camera/mapped\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Processing output messages: 100%|██████████| 499/499 [00:00<00:00, 7355.95it/s]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Found 499 trees for topic /output/camera/mapped\n",
"=====/output/classifier/classification\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Processing output messages: 100%|██████████| 499/499 [00:00<00:00, 1005.12it/s]\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Found 499 trees for topic /output/classifier/classification\n",
"=====/output/flight/cmd\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Processing output messages: 100%|██████████| 3595/3595 [00:09<00:00, 360.81it/s]"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Found 3595 trees for topic /output/flight/cmd\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\n"
]
}
],
"source": [
"%%skip_if_false E2E_ENABLED\n",
"\n",
"from message_tree.message_tree_algorithms import build_dep_trees\n",
"\n",
"end_topics = [t for t in _tracing_context.topics if any(re.search(f, t.name) for f in E2E_OUTPUT_TOPIC_PATTERNS)]\n",
"\n",
"print(f\"Found {len(end_topics)} end topics for E2E latency calculations: {', '.join(t.name for t in end_topics)}\")\n",
"\n",
"print(f\"Using {len(_tracing_context.callback_objects)} callback objects, {len(_tracing_context.callback_instances)} callback instances, and {len(_tracing_context.publish_instances)} publish instances for E2E latency calculations.\")\n",
"\n",
"def _build_dep_trees():\n",
" return build_dep_trees(end_topics, lat_graph, _tracing_context, E2E_EXCL_PATH_PATTERNS, E2E_TIME_LIMIT_S)\n",
" #return build_dep_trees(end_topics, lat_graph, _tracing_context, E2E_EXCL_PATH_PATTERNS, E2E_TIME_LIMIT_S, exact_path=E2E_EXACT_PATH)\n",
"\n",
"try:\n",
" trees = cached(\"trees\", _build_dep_trees, [TR_PATH], not CACHING_ENABLED)\n",
"except Exception as e:\n",
" import traceback\n",
" print(e)\n",
" traceback.print_exc()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"'Skipped (evaluated BW_ENABLED to False)'"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%%skip_if_false E2E_ENABLED\n",
"%%skip_if_false BW_ENABLED\n",
"\n",
"##################################################\n",
"# Get message bandwidths from `ros2 multitopic bw`\n",
"# output.\n",
"##################################################\n",
"\n",
"from bw_interop.process_bw_output import get_topic_messages\n",
"msgs = get_topic_messages(BW_PATH)\n",
"\n",
"from bw_interop.bw_plots import dds_lat_msg_size_scatter\n",
"plot_topic = \"\""
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Dependency Trees:\n",
"\n",
"Tree 1/6895:\n",
"Publish Instance: /output/telemetry/radio\n",
"Callback Instance: RadioNode::RadioNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /telemetry/data\n",
" Callback Instance: TelemetryNode::TelemetryNode(std::__cxx11::basic_string, std::allocator > const&, std::vector, std::allocator >, std::allocator, std::allocator > > > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /sensors/fused\n",
" Callback Instance: FusionNode::FusionNode(std::__cxx11::basic_string, std::allocator > const&, std::vector, std::allocator >, std::allocator, std::allocator > > > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /input/gps/fix\n",
" Callback Instance: GPSNode::GPSNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda()#1}\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
"\n",
"Tree 2/6895:\n",
"Publish Instance: /output/telemetry/radio\n",
"Callback Instance: RadioNode::RadioNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /telemetry/data\n",
" Callback Instance: TelemetryNode::TelemetryNode(std::__cxx11::basic_string, std::allocator > const&, std::vector, std::allocator >, std::allocator, std::allocator > > > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /sensors/fused\n",
" Callback Instance: FusionNode::FusionNode(std::__cxx11::basic_string, std::allocator > const&, std::vector, std::allocator >, std::allocator, std::allocator > > > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /input/baro/alt\n",
" Callback Instance: BaroNode::BaroNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda()#1}\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
"\n",
"Tree 3/6895:\n",
"Publish Instance: /output/telemetry/radio\n",
"Callback Instance: RadioNode::RadioNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /telemetry/data\n",
" Callback Instance: TelemetryNode::TelemetryNode(std::__cxx11::basic_string, std::allocator > const&, std::vector, std::allocator >, std::allocator, std::allocator > > > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /output/camera/mapped\n",
" Callback Instance: MappingNode::MappingNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /camera/geometric\n",
" Callback Instance: GeometricNode::GeometricNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /camera/radiometric\n",
" Callback Instance: RadiometricNode::RadiometricNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /camera/debayered\n",
" Callback Instance: DebayerNode::DebayerNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /input/camera/raw\n",
" Callback Instance: CameraNode::CameraNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda()#1}\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
"\n",
"Tree 4/6895:\n",
"Publish Instance: /output/telemetry/radio\n",
"Callback Instance: RadioNode::RadioNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /telemetry/data\n",
" Callback Instance: TelemetryNode::TelemetryNode(std::__cxx11::basic_string, std::allocator > const&, std::vector, std::allocator >, std::allocator, std::allocator > > > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /sensors/fused\n",
" Callback Instance: FusionNode::FusionNode(std::__cxx11::basic_string, std::allocator > const&, std::vector, std::allocator >, std::allocator, std::allocator > > > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /input/imu/data\n",
" Callback Instance: IMUNode::IMUNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda()#1}\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
"\n",
"Tree 5/6895:\n",
"Publish Instance: /output/telemetry/radio\n",
"Callback Instance: RadioNode::RadioNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /telemetry/data\n",
" Callback Instance: TelemetryNode::TelemetryNode(std::__cxx11::basic_string, std::allocator > const&, std::vector, std::allocator >, std::allocator, std::allocator > > > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /sensors/fused\n",
" Callback Instance: FusionNode::FusionNode(std::__cxx11::basic_string, std::allocator > const&, std::vector, std::allocator >, std::allocator, std::allocator > > > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /input/gps/fix\n",
" Callback Instance: GPSNode::GPSNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda()#1}\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
"\n",
"Tree 6/6895:\n",
"Publish Instance: /output/telemetry/radio\n",
"Callback Instance: RadioNode::RadioNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /telemetry/data\n",
" Callback Instance: TelemetryNode::TelemetryNode(std::__cxx11::basic_string, std::allocator > const&, std::vector, std::allocator >, std::allocator, std::allocator > > > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /sensors/fused\n",
" Callback Instance: FusionNode::FusionNode(std::__cxx11::basic_string, std::allocator > const&, std::vector, std::allocator >, std::allocator, std::allocator > > > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /input/baro/alt\n",
" Callback Instance: BaroNode::BaroNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda()#1}\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
" Callback Instance: std::_Bind))(std::shared_ptr > >)>\n",
"\n",
"Tree 7/6895:\n",
"Publish Instance: /output/telemetry/radio\n",
"Callback Instance: RadioNode::RadioNode(std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /telemetry/data\n",
" Callback Instance: TelemetryNode::TelemetryNode(std::__cxx11::basic_string, std::allocator > const&, std::vector, std::allocator >, std::allocator, std::allocator > > > const&, std::__cxx11::basic_string, std::allocator > const&, int, double)::{lambda(std::shared_ptr > >)#1}\n",
" Publish Instance: /output/camera/mapped\n",
" Callback Instance: MappingNode::MappingNode(std::__cxx11::basic_string, std::allocator