diff --git a/bw_interop/__init__.py b/bw_interop/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bw_interop/bw_plots.py b/bw_interop/bw_plots.py new file mode 100644 index 0000000..94cae75 --- /dev/null +++ b/bw_interop/bw_plots.py @@ -0,0 +1,16 @@ +import matplotlib.pyplot as plt +import numpy as np + + +def dds_lat_msg_size_scatter(topic_name, topic_dds_latencies, topic_msg_sizes): + plt.close("dds_lat_msg_size_scatter") + fig, ax = plt.subplots(num="dds_lat_msg_size_scatter", dpi=300, figsize=(15, 7)) + ax: plt.Axes + fig: plt.Figure + fig.suptitle(f"Correlation of Message Size and DDS Latency\nfor {topic_name}") + + ax.scatter(np.array(topic_dds_latencies) * 1e6, topic_msg_sizes) + ax.set_xlabel("Message Size [B]") + ax.set_ylabel("DDS Latency [µs]") + + return fig diff --git a/bw_interop/process_bw_output.py b/bw_interop/process_bw_output.py new file mode 100644 index 0000000..b55cbac --- /dev/null +++ b/bw_interop/process_bw_output.py @@ -0,0 +1,45 @@ +import pandas as pd +import tables as tb +import numpy as np + + +def bytes_str(bytes): + if bytes >= 1024**2: + return f"{bytes/(1024**2):.2f} MiB" + if bytes >= 1024: + return f"{bytes/1024:.2f} KiB" + return f"{bytes:.0f} B" + + +def get_topic_messages(h5_filename: str): + topic_messages = {} + + with tb.open_file(h5_filename, root_uep="/messages") as f: + for node in f.list_nodes("/"): + topic = node.title + messages = pd.DataFrame.from_records(node[:]) + topic_messages[topic] = messages + # if len(messages) >= 2: + # total_data = np.sum(messages["size"]) + # print(f'{len(messages):>5d}m, {bytes_str(total_data):>10s}, ' + # f'{bytes_str(total_data / (np.max(messages["timestamp"]) - np.min(messages["timestamp"]))):>10s}/s, ' + # f'{topic}') + + return topic_messages + + +def get_topic_stats(topics_dict: dict): + records = [] + for topic, messages in topics_dict: + total_data = np.sum(messages["size"]) + records.append({ + "topic": topic, + "count": len(messages), + "total_data": total_data, + "bandwidth": total_data / (np.max(messages["timestamp"]) - np.min(messages["timestamp"])), + "min_size": np.min(messages["size"]), + "avg_size": np.mean(messages["size"]), + "max_size": np.max(messages["size"]) + }) + + return pd.DataFrame.from_records(records) diff --git a/message_tree/message_tree_algorithms.py b/message_tree/message_tree_algorithms.py index 33c73f3..47c4111 100644 --- a/message_tree/message_tree_algorithms.py +++ b/message_tree/message_tree_algorithms.py @@ -260,10 +260,10 @@ def e2e_paths_sorted_desc(tree: DepTree, input_topic_patterns): return None if i == 0: - #print(end=".") - return None + # print(end=".") + return path # Return whole path, even if there is not publishing callback if not isinstance(path[i - 1], TrCallbackInstance): - #print(end="#") + # print(end="#") return None return path[i - 1:] # Return path from its publishing callback if it exists @@ -295,11 +295,13 @@ def e2e_latency_breakdown(path: list): case TrCallbackInstance() as cb_inst: match last_inst: case TrCallbackInstance() as cb_inst_prev: + ret_list.append(E2EBreakdownItem("cpu", cb_inst_prev.duration, + (cb_inst_prev, cb_inst_prev))) ret_list.append(E2EBreakdownItem("idle", cb_inst.t_start - cb_inst_prev.t_end, (cb_inst_prev, cb_inst))) case TrPublishInstance() as pub_inst_prev: ret_list.append(E2EBreakdownItem("dds", cb_inst.t_start - pub_inst_prev.timestamp, - (pub_inst_prev, inst))) + (pub_inst_prev, cb_inst))) case TrPublishInstance() as pub_inst: match last_inst: case TrCallbackInstance() as cb_inst_prev: diff --git a/trace-analysis.ipynb b/trace-analysis.ipynb index e05b69f..b871757 100644 --- a/trace-analysis.ipynb +++ b/trace-analysis.ipynb @@ -69,7 +69,7 @@ "# Path to trace directory (e.g. ~/.ros/my-trace/ust) or to a converted trace file.\n", "# Using the path \"/ust\" at the end is optional but greatly reduces processing time\n", "# if kernel traces are also present.\n", - "# TR_PATH = \"/home/max/Downloads/iteration2_worker1/aw_replay/tracing/scenario-trace/ust\"\n", + "# TR_PATH = \"~/Downloads/iteration1_worker1/aw_replay/tracing/scenario-trace/ust\"\n", "TR_PATH = \"data/trace-awsim-x86/ust\"\n", "\n", "# Path to the folder all artifacts from this notebook are saved to.\n", @@ -80,10 +80,10 @@ "CACHING_ENABLED = False\n", "\n", "# Whether to annotate topics/publications with bandwidth/message size\n", - "BW_ENABLED = False\n", - "# Path to a results folder as output by ma-hw-perf-tools/messages/record.bash\n", + "BW_ENABLED = True\n", + "# Path to a HDF5 file as output by ma-hw-perf-tools/messages/record.bash\n", "# Used to annotate message sizes in E2E latency calculations\n", - "BW_PATH = \"../ma-hw-perf-tools/data/results\"\n", + "BW_PATH = \"../ma-hw-perf-tools/data/messages-x86.h5\"\n", "\n", "# Whether to use dependencies extracted by the Clang-tools to supplement\n", "# automatic node-internal data flow annotations.\n", @@ -128,10 +128,10 @@ "\n", "# All topics containing any of these RegEx patterns are considered output topics in E2E latency calculations\n", "# E.g. r\"^/control/\" will cover all control topics\n", - "E2E_OUTPUT_TOPIC_PATTERNS = [r\"emergency/control_cmd\"]\n", + "E2E_OUTPUT_TOPIC_PATTERNS = [r\"^/control/command/control_cmd$\"]\n", "# All topics containing any of these RegEx patterns are considered input topics in E2E latency calculations\n", "# E.g. r\"^/sensing/\" will cover all sensing topics\n", - "E2E_INPUT_TOPIC_PATTERNS = [r\"^/vehicle/status/\", r\"^/sensing/(lidar/[^c]|[^l])\"]\n", + "E2E_INPUT_TOPIC_PATTERNS = [r\"^/sensing/.*?pointcloud\"]\n", "\n", "# E2E paths are uniquely identified by a string like \"/topic/1 -> void(Node1)(args1) -> /topic/2 -> void(Node2)(args2) -> void(Node2)(args3) -> ...\".\n", "# Certain patterns only occur in initial setup or in scenario switching and can be excluded via RegEx patterns here.\n", @@ -236,7 +236,6 @@ "execution_count": null, "outputs": [], "source": [ - "\n", "for topic in sorted(topics, key=lambda t: t.name):\n", " topic: TrTopic\n", " print(f\"{topic.name:.<120s} | {sum(map(lambda p: len(p.instances), topic.publishers))}\")\n", @@ -388,50 +387,11 @@ "%%skip_if_false E2E_ENABLED\n", "%%skip_if_false BW_ENABLED\n", "\n", + "from bw_interop.process_bw_output import get_topic_messages\n", + "msgs = get_topic_messages(BW_PATH)\n", "\n", - "def parse_bytes(string):\n", - " match string[-1]:\n", - " case 'K':\n", - " exponent = 1e3\n", - " case 'M':\n", - " exponent = 1e6\n", - " case _:\n", - " exponent = 1\n", - "\n", - " num = float(string.split(\" \")[0])\n", - " return num * exponent\n", - "\n", - "\n", - "def bytes_str(bytes):\n", - " if bytes >= 1024 ** 2:\n", - " return f\"{bytes / (1024 ** 2):.2f} MiB\"\n", - " if bytes >= 1024:\n", - " return f\"{bytes / 1024:.2f} KiB\"\n", - " return f\"{bytes:.0f} B\"\n", - "\n", - "\n", - "bw_files = glob.glob(os.path.join(BW_PATH, \"*.log\"))\n", - "msg_sizes = {}\n", - "for bw_file in bw_files:\n", - " with open(bw_file) as f:\n", - " lines = f.readlines()\n", - " topic = os.path.splitext(os.path.split(bw_file)[1])[0].replace(\"__\", \"/\")\n", - "\n", - " if not lines or re.match(f\"^\\s*$\", lines[-1]):\n", - " #print(f\"No data for {topic}\")\n", - " continue\n", - "\n", - " line_pattern = re.compile(\n", - " r\"(?P[0-9.]+ [KM]?)B/s from (?P[0-9.]+) messages --- Message size mean: (?P[0-9.]+ [KM]?)B min: (?P[0-9.]+ [KM]?)B max: (?P[0-9.]+ [KM]?)B\\n\")\n", - " m = re.fullmatch(line_pattern, lines[-1])\n", - " if m is None:\n", - " print(f\"Line could not be parsed in {topic}: '{lines[-1]}'\")\n", - " continue\n", - "\n", - " msg_sizes[topic] = {'bw': parse_bytes(m.group(\"bw\")),\n", - " 'min': parse_bytes(m.group(\"min\")),\n", - " 'mean': parse_bytes(m.group(\"mean\")),\n", - " 'max': parse_bytes(m.group(\"max\"))}" + "from bw_interop.bw_plots import dds_lat_msg_size_scatter\n", + "plot_topic = \"\"" ], "metadata": { "collapsed": false @@ -511,11 +471,147 @@ "\n", "from message_tree.message_tree_algorithms import e2e_paths_sorted_desc\n", "from message_tree.message_tree_plots import e2e_breakdown_type_hist\n", + "from message_tree.message_tree_algorithms import owner\n", + "\n", "\n", "trees_paths = [e2e_paths_sorted_desc(tree, E2E_INPUT_TOPIC_PATTERNS) for tree in tqdm(trees, mininterval=10.0,\n", " desc=\"Extracting E2E paths\")]\n", "all_paths = [p for paths in trees_paths for p in paths]\n", - "#all_e2e_items = [i for p in all_paths for i in p]" + "# all_e2e_items = [i for p in all_paths for i in p]\n", + "# print(trees[0])\n", + "\n", + "lidar_paths = [p for p in all_paths if any(map(lambda inst: re.search(\"^/sensing/.*?pointcloud\", owner(inst)), p))]\n" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "%%skip_if_false E2E_ENABLED\n", + "\n", + "from message_tree.message_tree_algorithms import aggregate_e2e_paths\n", + "\n", + "cohorts = aggregate_e2e_paths(lidar_paths) #all_paths)\n", + "cohort_pairs = [(k, v) for k, v in cohorts.items()]\n", + "cohort_pairs.sort(key=lambda kv: len(kv[1]), reverse=True)\n", + "\n", + "path_records = [{\"path\": path_key,\n", + " \"timestamp\": path[-1].timestamp,\n", + " \"e2e_latency\": path[-1].timestamp - path[0].timestamp} \\\n", + " for path_key, paths in cohort_pairs for path in paths if path]\n", + "\n", + "out_df = pd.DataFrame.from_records(path_records)\n", + "out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\", index=False)\n", + "\n", + "df_print = out_df[['path', 'e2e_latency']].groupby(\"path\").agg(['count', 'mean', 'min', 'max']).reset_index()\n", + "df_print['path'] = df_print['path'].apply(lambda path: \" -> \".join(filter(lambda part: part.startswith(\"/\"), path.split(\" -> \"))))\n", + "df_print = df_print.sort_values((\"e2e_latency\", \"count\"), ascending=False)\n", + "df_print.to_csv(os.path.join(OUT_PATH, \"e2e_overview.csv\"), sep=\"\\t\", index=False)\n", + "df_print" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "import pickle\n", + "#with open(\"state.pkl\", \"wb\") as f:\n", + "# pickle.dump((trees_paths, all_paths, lidar_paths, cohorts), f)\n", + "with open(\"state.pkl\", \"rb\") as f:\n", + " (trees_paths, all_paths, lidar_paths, cohorts) = pickle.load(f)" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "COHORT_EXCL_PATTERNS = [\"hazard\", \"turn_indicator\", \"gear_cmd\", \"emergency_cmd\", \"external_cmd\", \"/control/operation_mode\",\n", + " \"/planning/scenario_planning/scenario$\"]\n", + "COHORT_INCL_PATTERNS = [\"BehaviorPathPlanner\", \"BehaviorVelocityPlanner\", \"pointcloud_preprocessor::Filter\"]\n", + "\n", + "cohorts_filt = {k: v for k, v in cohorts.items()\n", + " if not any(re.search(f, k) for f in COHORT_EXCL_PATTERNS) and all(re.search(f, k) for f in COHORT_INCL_PATTERNS)}\n", + "\n", + "\n", + "print(len(cohorts), len(cohorts_filt))\n", + "for k, v in cohorts_filt.items():\n", + " print(f\"\\n\\n ({len(v)})\\n \", end=\"\")\n", + " print(\"\\n -> \".join(k.split(\" -> \")))\n", + "\n", + "lidar_chain, lidar_cohort = next(iter(cohorts_filt.items()))" + ], + "metadata": { + "collapsed": false + } + }, + { + "cell_type": "code", + "execution_count": null, + "outputs": [], + "source": [ + "def e2e_latency_breakdown__(path: list):\n", + " \"\"\"\n", + " Separates E2E latency into a sequence of dds, idle, and cpu times.\n", + " This method expects a publish instance at the last position in `path`.\n", + "\n", + " The return format is a list of the form [(\"\",