Fixes in message tree calculations, plots for the paper, started work on BW plots for the paper

This commit is contained in:
Maximilian Schmeller 2022-10-28 22:37:48 +09:00
parent 65c21fb6ce
commit a1369890bf
5 changed files with 342 additions and 73 deletions

View file

@ -69,7 +69,7 @@
"# Path to trace directory (e.g. ~/.ros/my-trace/ust) or to a converted trace file.\n",
"# Using the path \"/ust\" at the end is optional but greatly reduces processing time\n",
"# if kernel traces are also present.\n",
"# TR_PATH = \"/home/max/Downloads/iteration2_worker1/aw_replay/tracing/scenario-trace/ust\"\n",
"# TR_PATH = \"~/Downloads/iteration1_worker1/aw_replay/tracing/scenario-trace/ust\"\n",
"TR_PATH = \"data/trace-awsim-x86/ust\"\n",
"\n",
"# Path to the folder all artifacts from this notebook are saved to.\n",
@ -80,10 +80,10 @@
"CACHING_ENABLED = False\n",
"\n",
"# Whether to annotate topics/publications with bandwidth/message size\n",
"BW_ENABLED = False\n",
"# Path to a results folder as output by ma-hw-perf-tools/messages/record.bash\n",
"BW_ENABLED = True\n",
"# Path to a HDF5 file as output by ma-hw-perf-tools/messages/record.bash\n",
"# Used to annotate message sizes in E2E latency calculations\n",
"BW_PATH = \"../ma-hw-perf-tools/data/results\"\n",
"BW_PATH = \"../ma-hw-perf-tools/data/messages-x86.h5\"\n",
"\n",
"# Whether to use dependencies extracted by the Clang-tools to supplement\n",
"# automatic node-internal data flow annotations.\n",
@ -128,10 +128,10 @@
"\n",
"# All topics containing any of these RegEx patterns are considered output topics in E2E latency calculations\n",
"# E.g. r\"^/control/\" will cover all control topics\n",
"E2E_OUTPUT_TOPIC_PATTERNS = [r\"emergency/control_cmd\"]\n",
"E2E_OUTPUT_TOPIC_PATTERNS = [r\"^/control/command/control_cmd$\"]\n",
"# All topics containing any of these RegEx patterns are considered input topics in E2E latency calculations\n",
"# E.g. r\"^/sensing/\" will cover all sensing topics\n",
"E2E_INPUT_TOPIC_PATTERNS = [r\"^/vehicle/status/\", r\"^/sensing/(lidar/[^c]|[^l])\"]\n",
"E2E_INPUT_TOPIC_PATTERNS = [r\"^/sensing/.*?pointcloud\"]\n",
"\n",
"# E2E paths are uniquely identified by a string like \"/topic/1 -> void(Node1)(args1) -> /topic/2 -> void(Node2)(args2) -> void(Node2)(args3) -> ...\".\n",
"# Certain patterns only occur in initial setup or in scenario switching and can be excluded via RegEx patterns here.\n",
@ -236,7 +236,6 @@
"execution_count": null,
"outputs": [],
"source": [
"\n",
"for topic in sorted(topics, key=lambda t: t.name):\n",
" topic: TrTopic\n",
" print(f\"{topic.name:.<120s} | {sum(map(lambda p: len(p.instances), topic.publishers))}\")\n",
@ -388,50 +387,11 @@
"%%skip_if_false E2E_ENABLED\n",
"%%skip_if_false BW_ENABLED\n",
"\n",
"from bw_interop.process_bw_output import get_topic_messages\n",
"msgs = get_topic_messages(BW_PATH)\n",
"\n",
"def parse_bytes(string):\n",
" match string[-1]:\n",
" case 'K':\n",
" exponent = 1e3\n",
" case 'M':\n",
" exponent = 1e6\n",
" case _:\n",
" exponent = 1\n",
"\n",
" num = float(string.split(\" \")[0])\n",
" return num * exponent\n",
"\n",
"\n",
"def bytes_str(bytes):\n",
" if bytes >= 1024 ** 2:\n",
" return f\"{bytes / (1024 ** 2):.2f} MiB\"\n",
" if bytes >= 1024:\n",
" return f\"{bytes / 1024:.2f} KiB\"\n",
" return f\"{bytes:.0f} B\"\n",
"\n",
"\n",
"bw_files = glob.glob(os.path.join(BW_PATH, \"*.log\"))\n",
"msg_sizes = {}\n",
"for bw_file in bw_files:\n",
" with open(bw_file) as f:\n",
" lines = f.readlines()\n",
" topic = os.path.splitext(os.path.split(bw_file)[1])[0].replace(\"__\", \"/\")\n",
"\n",
" if not lines or re.match(f\"^\\s*$\", lines[-1]):\n",
" #print(f\"No data for {topic}\")\n",
" continue\n",
"\n",
" line_pattern = re.compile(\n",
" r\"(?P<bw>[0-9.]+ [KM]?)B/s from (?P<n_msgs>[0-9.]+) messages --- Message size mean: (?P<mean>[0-9.]+ [KM]?)B min: (?P<min>[0-9.]+ [KM]?)B max: (?P<max>[0-9.]+ [KM]?)B\\n\")\n",
" m = re.fullmatch(line_pattern, lines[-1])\n",
" if m is None:\n",
" print(f\"Line could not be parsed in {topic}: '{lines[-1]}'\")\n",
" continue\n",
"\n",
" msg_sizes[topic] = {'bw': parse_bytes(m.group(\"bw\")),\n",
" 'min': parse_bytes(m.group(\"min\")),\n",
" 'mean': parse_bytes(m.group(\"mean\")),\n",
" 'max': parse_bytes(m.group(\"max\"))}"
"from bw_interop.bw_plots import dds_lat_msg_size_scatter\n",
"plot_topic = \"\""
],
"metadata": {
"collapsed": false
@ -511,11 +471,147 @@
"\n",
"from message_tree.message_tree_algorithms import e2e_paths_sorted_desc\n",
"from message_tree.message_tree_plots import e2e_breakdown_type_hist\n",
"from message_tree.message_tree_algorithms import owner\n",
"\n",
"\n",
"trees_paths = [e2e_paths_sorted_desc(tree, E2E_INPUT_TOPIC_PATTERNS) for tree in tqdm(trees, mininterval=10.0,\n",
" desc=\"Extracting E2E paths\")]\n",
"all_paths = [p for paths in trees_paths for p in paths]\n",
"#all_e2e_items = [i for p in all_paths for i in p]"
"# all_e2e_items = [i for p in all_paths for i in p]\n",
"# print(trees[0])\n",
"\n",
"lidar_paths = [p for p in all_paths if any(map(lambda inst: re.search(\"^/sensing/.*?pointcloud\", owner(inst)), p))]\n"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"%%skip_if_false E2E_ENABLED\n",
"\n",
"from message_tree.message_tree_algorithms import aggregate_e2e_paths\n",
"\n",
"cohorts = aggregate_e2e_paths(lidar_paths) #all_paths)\n",
"cohort_pairs = [(k, v) for k, v in cohorts.items()]\n",
"cohort_pairs.sort(key=lambda kv: len(kv[1]), reverse=True)\n",
"\n",
"path_records = [{\"path\": path_key,\n",
" \"timestamp\": path[-1].timestamp,\n",
" \"e2e_latency\": path[-1].timestamp - path[0].timestamp} \\\n",
" for path_key, paths in cohort_pairs for path in paths if path]\n",
"\n",
"out_df = pd.DataFrame.from_records(path_records)\n",
"out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\", index=False)\n",
"\n",
"df_print = out_df[['path', 'e2e_latency']].groupby(\"path\").agg(['count', 'mean', 'min', 'max']).reset_index()\n",
"df_print['path'] = df_print['path'].apply(lambda path: \" -> \".join(filter(lambda part: part.startswith(\"/\"), path.split(\" -> \"))))\n",
"df_print = df_print.sort_values((\"e2e_latency\", \"count\"), ascending=False)\n",
"df_print.to_csv(os.path.join(OUT_PATH, \"e2e_overview.csv\"), sep=\"\\t\", index=False)\n",
"df_print"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"import pickle\n",
"#with open(\"state.pkl\", \"wb\") as f:\n",
"# pickle.dump((trees_paths, all_paths, lidar_paths, cohorts), f)\n",
"with open(\"state.pkl\", \"rb\") as f:\n",
" (trees_paths, all_paths, lidar_paths, cohorts) = pickle.load(f)"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"COHORT_EXCL_PATTERNS = [\"hazard\", \"turn_indicator\", \"gear_cmd\", \"emergency_cmd\", \"external_cmd\", \"/control/operation_mode\",\n",
" \"/planning/scenario_planning/scenario$\"]\n",
"COHORT_INCL_PATTERNS = [\"BehaviorPathPlanner\", \"BehaviorVelocityPlanner\", \"pointcloud_preprocessor::Filter\"]\n",
"\n",
"cohorts_filt = {k: v for k, v in cohorts.items()\n",
" if not any(re.search(f, k) for f in COHORT_EXCL_PATTERNS) and all(re.search(f, k) for f in COHORT_INCL_PATTERNS)}\n",
"\n",
"\n",
"print(len(cohorts), len(cohorts_filt))\n",
"for k, v in cohorts_filt.items():\n",
" print(f\"\\n\\n ({len(v)})\\n \", end=\"\")\n",
" print(\"\\n -> \".join(k.split(\" -> \")))\n",
"\n",
"lidar_chain, lidar_cohort = next(iter(cohorts_filt.items()))"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"def e2e_latency_breakdown__(path: list):\n",
" \"\"\"\n",
" Separates E2E latency into a sequence of dds, idle, and cpu times.\n",
" This method expects a publish instance at the last position in `path`.\n",
"\n",
" The return format is a list of the form [(\"<type>\", <time>), (\"<type>\", <time>), ...] with type bein gone of the\n",
" three mentioned above.\n",
" \"\"\"\n",
" ret_list: List[E2EBreakdownItem] = []\n",
"\n",
" cb_inst: TrCallbackInstance\n",
" cb_inst_prev: TrCallbackInstance\n",
" pub_inst: TrPublishInstance\n",
" pub_inst_prev: TrPublishInstance\n",
"\n",
" last_inst = None\n",
" for inst in path:\n",
" match inst:\n",
" case TrCallbackInstance() as cb_inst:\n",
" match last_inst:\n",
" case TrCallbackInstance() as cb_inst_prev:\n",
" ret_list.append(E2EBreakdownItem(\"cpu\", cb_inst_prev.duration,\n",
" (cb_inst_prev, cb_inst_prev)))\n",
" ret_list.append(E2EBreakdownItem(\"idle\", cb_inst.t_start - cb_inst_prev.t_end,\n",
" (cb_inst_prev, cb_inst)))\n",
" case TrPublishInstance() as pub_inst_prev:\n",
" ret_list.append(E2EBreakdownItem(\"dds\", cb_inst.t_start - pub_inst_prev.timestamp,\n",
" (pub_inst_prev, cb_inst)))\n",
" case TrPublishInstance() as pub_inst:\n",
" match last_inst:\n",
" case TrCallbackInstance() as cb_inst_prev:\n",
" ret_list.append(E2EBreakdownItem(\"cpu\", pub_inst.timestamp - cb_inst_prev.t_start,\n",
" (cb_inst_prev, pub_inst)))\n",
" case TrPublishInstance():\n",
" raise TypeError(f\"Found two publish instances in a row in an E2E path.\")\n",
" last_inst = inst\n",
"\n",
" if not isinstance(last_inst, TrPublishInstance):\n",
" raise TypeError(f\"Last instance in path is not a message but a {type(last_inst).__name__}\")\n",
"\n",
" return ret_list\n",
"\n",
"e2e_breakdowns = list(map(e2e_latency_breakdown__, lidar_cohort))\n",
"filt = [(path, bdown) for (path, bdown) in zip(lidar_cohort, e2e_breakdowns)\n",
" if not any(True for item in bdown\n",
" if item.type == \"idle\" and item.duration > item.location[1].callback_obj.owner.period * 1e-9)]\n",
"\n",
"lidar_cohort_orig = lidar_cohort\n",
"e2e_breakdowns_orig = e2e_breakdowns\n",
"\n",
"lidar_cohort, e2e_breakdowns = zip(*filt)"
],
"metadata": {
"collapsed": false
@ -531,9 +627,42 @@
"\n",
"from message_tree.message_tree_algorithms import e2e_latency_breakdown\n",
"\n",
"conv_items = [i for p in tqdm(all_paths, mininterval=5.0, desc=\"Calculating E2E latency breakdowns\")\n",
" for i in e2e_latency_breakdown(p)]\n",
"e2e_breakdown_type_hist(conv_items)\n",
"conv_items = [i for p in e2e_breakdowns for i in p]\n",
"with open(\"out/plot_e2es_path.txt\", \"w\") as f:\n",
" f.write(f\"Number of path instances: {len(lidar_cohort)}\\n\")\n",
" f.write( \" \" + \"\\n -> \".join(lidar_chain.split(\" -> \")))\n",
" f.write(\"\\n\")\n",
"\n",
"conv_items_unique = set(conv_items)\n",
"\n",
"def e2e_breakdown_type_hist__(items):\n",
" \"\"\"\n",
" Given a list of e2e breakdown instances of the form `(\"<type>\", <duration>)`, plots a histogram for each encountered\n",
" type.\n",
" \"\"\"\n",
" plot_types = (\"dds\", \"idle\", \"cpu\")\n",
" #assert all(item.type in plot_types for item in items)\n",
"\n",
" plt.close(\"E2E type breakdown histograms\")\n",
" fig, axes = plt.subplots(1, 3, num=\"E2E type breakdown histograms\", dpi=300, figsize=(16, 9))\n",
" fig.suptitle(\"E2E Latency Breakdown by Resource Type\")\n",
"\n",
" for type, ax in zip(plot_types, axes):\n",
" durations = [item.duration for item in items if item.type == type]\n",
"\n",
" df = pd.Series(durations)\n",
" df.to_csv(f\"out/plot_e2es_{type}_portion.csv\", header=[f\"e2e_latency_{type}_portion_s\"], index=False)\n",
"\n",
" ax.set_title(type)\n",
" ax.hist(durations, bins=50)\n",
" #ax.set_yscale(\"log\")\n",
" ax.set_xlabel(\"Duration [s]\")\n",
" ax.set_ylabel(\"Occurrences\")\n",
"\n",
" return fig\n",
"\n",
"fig = e2e_breakdown_type_hist__(conv_items_unique)\n",
"plt.savefig(\"out/plot_e2e_portions.png\")\n",
"\n",
"None\n"
],
@ -547,22 +676,99 @@
"outputs": [],
"source": [
"%%skip_if_false E2E_ENABLED\n",
"%%skip_if_false E2E_PLOT\n",
"\n",
"from message_tree.message_tree_algorithms import aggregate_e2e_paths\n",
"e2es = [path[-1].timestamp - path[0].timestamp for path in lidar_cohort]\n",
"\n",
"cohorts = aggregate_e2e_paths(all_paths)\n",
"cohort_pairs = [(k, v) for k, v in cohorts.items()]\n",
"cohort_pairs.sort(key=lambda kv: len(kv[1]), reverse=True)\n",
"df = pd.Series(e2es)\n",
"df.to_csv(\"out/plot_e2es.csv\", index=False, header=[\"e2e_latency_s\"])\n",
"\n",
"path_records = [{\"path\": path_key,\n",
" \"timestamp\": path[-1].timestamp,\n",
" \"e2e_latency\": path[-1].timestamp - path[0].timestamp} \\\n",
" for path_key, paths in cohort_pairs for path in paths if path]\n",
"plt.close(\"E2E histogram\")\n",
"fig, ax = plt.subplots(num=\"E2E histogram\", dpi=300, figsize=(16, 9))\n",
"fig.suptitle(\"E2E Latency Histogram\")\n",
"ax: plt.Axes\n",
"ax.hist(e2es, bins=30)\n",
"ax.set_xlabel(\"E2E Latency [s]\")\n",
"ax.set_ylabel(\"Occurrences\")\n",
"ax.axvline(np.mean(e2es), c=\"red\", linewidth=2)\n",
"_, max_ylim = ax.get_ylim()\n",
"ax.text(np.mean(e2es) * 1.02, max_ylim * 0.98, 'Mean: {:.3f}s'.format(np.mean(e2es)))\n",
"plt.savefig(\"out/plot_e2es.png\")\n",
"None"
],
"metadata": {
"collapsed": false
}
},
{
"cell_type": "code",
"execution_count": null,
"outputs": [],
"source": [
"from message_tree.message_tree_algorithms import _repr\n",
"from message_tree.message_tree_structure import E2EBreakdownItem\n",
"\n",
"out_df = pd.DataFrame.from_records(path_records)\n",
"out_df.to_csv(os.path.join(OUT_PATH, \"e2e.csv\"), sep=\"\\t\", index=False)\n",
"def label_latency_item(item: E2EBreakdownItem):\n",
" match item.type:\n",
" case \"cpu\":\n",
" return f\"{_repr(item.location[0])}\"\n",
" case \"idle\":\n",
" cb_inst: TrCallbackInstance = item.location[0]\n",
" owner = cb_inst.callback_obj.owner\n",
" match owner:\n",
" case TrTimer() as tmr:\n",
" tmr: TrTimer\n",
" node_name = tmr.node.path\n",
" case TrSubscriptionObject() as sub:\n",
" sub: TrSubscriptionObject\n",
" node_name = sub.subscription.node.path\n",
" case _:\n",
" raise TypeError()\n",
" return f\"{node_name}\"\n",
" case \"dds\":\n",
" msg_inst: TrPublishInstance = item.location[0]\n",
" return f\"{msg_inst.publisher.topic_name}\"\n",
" case _:\n",
" return ValueError()\n",
"\n",
"mode_cohort_key, mode_cohort = cohort_pairs[0]"
"plt.close(\"E2E path breakdown\")\n",
"fig, ax = plt.subplots(num=\"E2E path breakdown\", dpi=300, figsize=(16, 5))\n",
"fig.suptitle(\"E2E Latency Path Breakdown\")\n",
"ax: plt.Axes\n",
"\n",
"component_durations = list(zip(*[e2e_latency_breakdown__(p) for p in tqdm(lidar_cohort, desc=\"Calculating breakdowns\")]))\n",
"labels = [label_latency_item(item) for item in e2e_latency_breakdown__(lidar_cohort[0])]\n",
"types = [item.type for item in e2e_latency_breakdown__(lidar_cohort[0])]\n",
"component_durations = [list(map(lambda item: item.duration, d)) for d in component_durations]\n",
"print(len(component_durations), len(labels))\n",
"\n",
"import matplotlib.patches as mpatches\n",
"\n",
"legend_entries = []\n",
"def add_label(violin, label):\n",
" color = violin[\"bodies\"][0].get_facecolor().flatten()\n",
" legend_entries.append((mpatches.Patch(color=color), label))\n",
"\n",
"for type in (\"idle\", \"dds\", \"cpu\"):\n",
" indices = [i for i, t in enumerate(types) if t == type]\n",
" xs = [component_durations[i] for i in indices]\n",
" vln = ax.violinplot(xs, indices)\n",
" add_label(vln, type)\n",
" for i, x in zip(indices, xs):\n",
" df_out = pd.Series(x)\n",
" df_out.to_csv(f\"out/plot_e2es_violin_{i:02d}.csv\", index=False, header=[\"duration_s\"])\n",
"ax.set_ylabel(\"Latency contribution [s]\")\n",
"ax.set_xticks(range(len(labels)), labels, rotation=90)\n",
"ax.legend(*zip(*legend_entries))\n",
"plt.savefig(\"out/plot_e2es_violin.png\")\n",
"\n",
"df_labels = pd.Series(labels)\n",
"df_labels.to_csv(\"out/plot_e2es_violin_labels.csv\", index=False, header=[\"label\"])\n",
"\n",
"df_types = pd.Series(types)\n",
"df_types.to_csv(\"out/plot_e2es_violin_types.csv\", index=False, header=[\"type\"])\n",
"\n",
"None"
],
"metadata": {
"collapsed": false
@ -579,10 +785,10 @@
"from message_tree.message_tree_plots import e2e_breakdown_inst_stack\n",
"\n",
"\n",
"mode_cohort_breakdown = [e2e_latency_breakdown(p) for p in mode_cohort[:200]]\n",
"print(len(mode_cohort))\n",
"print(mode_cohort_key.replace(\" -> \", \"\\n -> \"))\n",
"e2e_breakdown_inst_stack(*mode_cohort_breakdown)"
"fig = e2e_breakdown_inst_stack(*e2e_breakdowns)\n",
"fig.set_size_inches(16, 9)\n",
"fig.set_dpi(300)\n",
"None"
],
"metadata": {
"collapsed": false