dataflow-analysis/latency_graph/latency_graph.py

290 lines
9.8 KiB
Python
Raw Normal View History

from bisect import bisect_left, bisect
from dataclasses import dataclass
from itertools import combinations
from multiprocessing import Pool
from typing import Optional, Set, List, Iterable, Dict, Tuple
import numpy as np
from tqdm.notebook import tqdm
from tqdm.contrib import concurrent
from matching.subscriptions import sanitize
from tracing_interop.tr_types import TrContext, TrCallbackObject, TrCallbackSymbol, TrNode, TrPublisher, TrSubscription, \
TrTimer, TrPublishInstance, TrSubscriptionObject, TrTopic, TrCallbackInstance, Timestamp
TOPIC_FILTERS = ["/parameter_events", "/tf_static", "/robot_description", "diagnostics", "/rosout"]
def _get_cb_owner_node(cb: TrCallbackObject) -> TrNode | None:
match cb.owner:
case TrTimer(node=node):
owner_node = node
case TrSubscriptionObject(subscription=sub):
owner_node = sub.node
case _:
owner_node = None
if not owner_node:
print("[WARN] CB has no owners")
return None
return owner_node
def _hierarchize(lg_nodes: Iterable['LGHierarchyLevel']):
base = LGHierarchyLevel(None, [], "", [])
def _insert(parent, node, path):
match path:
case []:
parent.children.append(node)
node.parent = parent
case [head, *tail]:
next_node = next(iter(n for n in parent.children if n.name == head), None)
if next_node is None:
next_node = LGHierarchyLevel(parent, [], head, [])
parent.children.append(next_node)
_insert(next_node, node, tail)
for node in lg_nodes:
path = node.name.strip("/").split("/")
node.name = path[-1]
_insert(base, node, path[:-1])
return base
def inst_runtime_interval(cb_inst: TrCallbackInstance):
start_time = cb_inst.timestamp
end_time = start_time + cb_inst.duration
return start_time, end_time
def _get_publishing_cbs(cbs: Set[TrCallbackObject], pub: TrPublisher):
"""
Counts number of publication instances that lie within one of the cb_intervals.
"""
pub_insts = pub.instances
pub_cb_overlaps = {i: set() for i in range(len(pub_insts))}
for cb in cbs:
cb_intervals = map(inst_runtime_interval, cb.callback_instances)
for t_start, t_end in cb_intervals:
i_overlap_begin = bisect_left(pub_insts, t_start, key=lambda x: x.timestamp)
i_overlap_end = bisect(pub_insts, t_end, key=lambda x: x.timestamp)
for i in range(i_overlap_begin, i_overlap_end):
pub_cb_overlaps[i].add(cb)
pub_cbs = set()
cb_cb_overlaps = set()
for i, i_cbs in pub_cb_overlaps.items():
if not i_cbs:
print(f"[WARN] Publication on {pub.topic_name} without corresponding callback!")
elif len(i_cbs) == 1:
pub_cbs.update(i_cbs)
else: # Multiple CBs in i_cbs
cb_cb_overlaps.update(iter(combinations(i_cbs, 2)))
for cb1, cb2 in cb_cb_overlaps:
cb1_subset_of_cb2 = True
cb2_subset_of_cb1 = True
for i_cbs in pub_cb_overlaps.values():
if cb1 in i_cbs and cb2 not in i_cbs:
cb1_subset_of_cb2 = False
if cb2 in i_cbs and cb1 not in i_cbs:
cb2_subset_of_cb1 = False
if cb1_subset_of_cb2 and cb2_subset_of_cb1:
print(f"[WARN] Callbacks {cb1.id} and {cb2.id} always run in parallel")
elif cb1_subset_of_cb2:
pub_cbs.discard(cb1)
elif cb2_subset_of_cb1:
pub_cbs.discard(cb2)
# else: discard none of them
return pub_cbs
def _get_cb_topic_deps(nodes_to_cbs: Dict[TrNode, Set[TrCallbackObject]]):
cbs_subbed_to_topic: Dict[TrTopic, Set[TrCallbackObject]] = {}
# Find topics the callback EXPLICITLY depends on
# - Timer callbacks: no EXPLICIT dependencies
# - Subscription callbacks: CB depends on the subscribed topic. Possibly also has other IMPLICIT dependencies
p = tqdm(desc="Processing CB subscriptions", total=sum(map(len, nodes_to_cbs.values())))
for node, cbs in nodes_to_cbs.items():
for cb in cbs:
p.update()
if type(cb.owner) == TrSubscriptionObject:
dep_topics = [cb.owner.subscription.topic]
elif type(cb.owner) == TrTimer:
dep_topics = []
elif cb.owner is None:
continue
else:
print(f" [WARN] Callback owners other than timers/subscriptions cannot be handled: {cb.owner}")
continue
for topic in dep_topics:
if topic not in cbs_subbed_to_topic:
cbs_subbed_to_topic[topic] = set()
cbs_subbed_to_topic[topic].add(cb)
# Find topics the callback publishes to (HEURISTICALLY!)
# For topics published to during the runtime of the callback's instances,
# assume that they are published by the callback
cbs_publishing_topic: Dict[TrTopic, Set[TrCallbackObject]] = {}
cb_publishers: Dict[TrCallbackObject, Set[TrPublisher]] = {}
for node, cbs in tqdm(nodes_to_cbs.items(), desc="Processing node publications"):
if node is None:
continue
for pub in node.publishers:
if any(f in pub.topic_name for f in TOPIC_FILTERS):
continue
pub_cbs = _get_publishing_cbs(cbs, pub)
for cb in pub_cbs:
if cb not in cb_publishers:
cb_publishers[cb] = set()
cb_publishers[cb].add(pub)
if pub.topic not in cbs_publishing_topic:
cbs_publishing_topic[pub.topic] = set()
cbs_publishing_topic[pub.topic].update(pub_cbs)
return cbs_subbed_to_topic, cbs_publishing_topic, cb_publishers
@dataclass
class LGCallback:
name: str
in_topics: List[TrTopic]
out_topics: List[TrTopic]
def id(self):
return self.name
@dataclass
class LGTrCallback(LGCallback):
cb: TrCallbackObject
sym: TrCallbackSymbol | None
node: TrNode | None
def id(self):
return str(self.cb.id)
@dataclass
class LGHierarchyLevel:
parent: Optional['LGHierarchyLevel']
children: List['LGHierarchyLevel']
name: str
callbacks: List[LGCallback]
@property
def full_name(self):
if self.parent is None:
return f"{self.name}"
return f"{self.parent.full_name}/{self.name}"
@dataclass
class LGEdge:
start: LGCallback
end: LGCallback
topic: TrTopic
@dataclass
class LatencyGraph:
top_node: LGHierarchyLevel
edges: List[LGEdge]
cb_pubs: Dict[TrCallbackObject, Set[TrPublisher]]
pub_cbs: Dict[TrPublisher, Set[TrCallbackObject]]
def __init__(self, tr: TrContext):
##################################################
# Annotate nodes with their callbacks
##################################################
# Note that nodes can also be None!
nodes_to_cbs = {}
for cb in tqdm(tr.callback_objects, desc="Finding CB nodes"):
node = _get_cb_owner_node(cb)
if node not in nodes_to_cbs:
nodes_to_cbs[node] = set()
nodes_to_cbs[node].add(cb)
##################################################
# Find in/out topics for each callback
##################################################
cbs_subbed_to_topic, cbs_publishing_topic, cb_pubs = _get_cb_topic_deps(nodes_to_cbs)
pub_cbs = {}
for cb, pubs in cb_pubs.items():
for pub in pubs:
if pub not in pub_cbs:
pub_cbs[pub] = set()
pub_cbs[pub].add(cb)
self.cb_pubs = cb_pubs
self.pub_cbs = pub_cbs
##################################################
# Define nodes and edges on lowest level
##################################################
input = LGCallback("INPUT", [], [topic for topic in tr.topics if not topic.publishers])
output = LGCallback("OUTPUT", [topic for topic in tr.topics if not topic.subscriptions], [])
in_node = LGHierarchyLevel(None, [], "INPUT", [input])
out_node = LGHierarchyLevel(None, [], "OUTPUT", [output])
lg_nodes = [in_node, out_node]
tr_to_lg_cb = {}
p = tqdm(desc="Building graph nodes", total=sum(map(len, nodes_to_cbs.values())))
for node, cbs in nodes_to_cbs.items():
node_callbacks = []
for cb in cbs:
p.update()
sym = cb.callback_symbol
if sym is not None:
pretty_sym = sanitize(sym.symbol)
else:
pretty_sym = cb.id
in_topics = [topic for topic, cbs in cbs_subbed_to_topic.items() if cb in cbs]
out_topics = [topic for topic, cbs in cbs_publishing_topic.items() if cb in cbs]
lg_cb = LGTrCallback(pretty_sym, in_topics, out_topics, cb, sym, node)
node_callbacks.append(lg_cb)
tr_to_lg_cb[cb] = lg_cb
lg_node = LGHierarchyLevel(None, [], node.path if node else "[NONE]", node_callbacks)
lg_nodes.append(lg_node)
edges = []
for topic in tqdm(tr.topics, desc="Building graph edges"):
sub_cbs = cbs_subbed_to_topic[topic] if topic in cbs_subbed_to_topic else []
pub_cbs = cbs_publishing_topic[topic] if topic in cbs_publishing_topic else []
for sub_cb in sub_cbs:
for pub_cb in pub_cbs:
lg_edge = LGEdge(tr_to_lg_cb[pub_cb], tr_to_lg_cb[sub_cb], topic)
edges.append(lg_edge)
self.edges = edges
##################################################
# Nodes into hierarchy levels
##################################################
self.top_node = _hierarchize(lg_nodes)