dataflow-analysis/latency_graph/latency_graph.py

330 lines
11 KiB
Python
Raw Normal View History

from dataclasses import dataclass
from itertools import combinations
from multiprocessing import Pool
from typing import Optional, Set, List, Iterable, Dict, Tuple
from tqdm.notebook import tqdm
from tqdm.contrib import concurrent
from matching.subscriptions import sanitize
from tracing_interop.tr_types import TrContext, TrCallbackObject, TrCallbackSymbol, TrNode, TrPublisher, TrSubscription, \
TrTimer, TrPublishInstance, TrSubscriptionObject, TrTopic, TrCallbackInstance
TOPIC_FILTERS = ["/parameter_events", "/tf_static", "/robot_description", "diagnostics"]
def _map_cb_times(args):
cb_id, inst_times, pub_timestamps = args
pub_cb_overlaps = {i: set() for i in range(len(pub_timestamps))}
inst_times.sort(key=lambda tup: tup[0]) # tup[0] is start time
inst_iter = iter(inst_times)
pub_iter = iter(enumerate(pub_timestamps))
inst_start, inst_end = next(inst_iter, (None, None))
i, t = next(pub_iter, (None, None))
while inst_start is not None and i is not None:
if inst_start <= t <= inst_end:
pub_cb_overlaps[i].add(cb_id)
if t <= inst_end:
i, t = next(pub_iter, (None, None))
else:
inst_start, inst_end = next(inst_iter, (None, None))
return pub_cb_overlaps
def _get_cb_owner_node(cb: TrCallbackObject) -> TrNode | None:
match cb.owner:
case TrTimer(nodes=nodes):
owner_nodes = nodes
case TrSubscriptionObject(subscription=sub):
owner_nodes = [sub.node]
case _:
owner_nodes = []
if len(owner_nodes) > 1:
raise RuntimeError(f"CB has owners {', '.join(map(lambda n: n.path, owner_nodes))}")
elif not owner_nodes:
print("[WARN] CB has no owners")
return None
return owner_nodes[0]
def _hierarchize(lg_nodes: Iterable['LGHierarchyLevel']):
base = LGHierarchyLevel(None, [], "", [])
def _insert(parent, node, path):
match path:
case []:
parent.children.append(node)
node.parent = parent
case [head, *tail]:
next_node = next(iter(n for n in parent.children if n.name == head), None)
if next_node is None:
next_node = LGHierarchyLevel(parent, [], head, [])
parent.children.append(next_node)
_insert(next_node, node, tail)
for node in lg_nodes:
path = node.name.strip("/").split("/")
node.name = path[-1]
_insert(base, node, path[:-1])
return base
def inst_runtime_interval(cb_inst: TrCallbackInstance):
inst_t_min = cb_inst.timestamp.timestamp()
inst_t_max = inst_t_min + cb_inst.duration.total_seconds()
return inst_t_min, inst_t_max
def _get_publishing_cbs(cbs: Set[TrCallbackObject], pub: TrPublisher):
"""
Counts number of publication instances that lie within one of the cb_intervals.
"""
pub_timestamps = [inst.timestamp * 1e-9 for inst in pub.instances]
# Algorithm: Two-pointer method
# With both the pub_timestamps and cb_intervals sorted ascending,
# we can cut down the O(m*n) comparisons to O(m+n).
pub_timestamps.sort()
cb_id_to_cb = {cb.id: cb for cb in cbs}
_map_args = [(cb.id, [inst_runtime_interval(inst) for inst in cb.callback_instances], pub_timestamps) for cb in cbs]
with Pool() as p:
cb_wise_overlaps = p.map(_map_cb_times, _map_args)
pub_cb_overlaps = {i: set() for i in range(len(pub_timestamps))}
for overlap_dict in cb_wise_overlaps:
for i, cb_ids in overlap_dict.items():
cbs = [cb_id_to_cb[cb_id] for cb_id in cb_ids]
pub_cb_overlaps[i].update(cbs)
pub_cbs = set()
cb_cb_overlaps = set()
for i, i_cbs in pub_cb_overlaps.items():
if not i_cbs:
print(f"[WARN] Publication on {pub.topic_name} without corresponding callback!")
elif len(i_cbs) == 1:
pub_cbs.update(i_cbs)
else: # Multiple CBs in i_cbs
cb_cb_overlaps.update(iter(combinations(i_cbs, 2)))
for cb1, cb2 in cb_cb_overlaps:
cb1_subset_of_cb2 = True
cb2_subset_of_cb1 = True
for i_cbs in pub_cb_overlaps.values():
if cb1 in i_cbs and cb2 not in i_cbs:
cb1_subset_of_cb2 = False
if cb2 in i_cbs and cb1 not in i_cbs:
cb2_subset_of_cb1 = False
if cb1_subset_of_cb2 and cb2_subset_of_cb1:
print(f"[WARN] Callbacks {cb1.id} and {cb2.id} always run in parallel")
elif cb1_subset_of_cb2:
pub_cbs.discard(cb1)
elif cb2_subset_of_cb1:
pub_cbs.discard(cb2)
# else: discard none of them
return pub_cbs
def _get_cb_topic_deps(nodes_to_cbs: Dict[TrNode, Set[TrCallbackObject]]):
cbs_subbed_to_topic: Dict[TrTopic, Set[TrCallbackObject]] = {}
# Find topics the callback EXPLICITLY depends on
# - Timer callbacks: no EXPLICIT dependencies
# - Subscription callbacks: CB depends on the subscribed topic. Possibly also has other IMPLICIT dependencies
p = tqdm(desc="Processing CB subscriptions", total=sum(map(len, nodes_to_cbs.values())))
for node, cbs in nodes_to_cbs.items():
for cb in cbs:
p.update()
if type(cb.owner) == TrSubscriptionObject:
dep_topics = [cb.owner.subscription.topic]
elif type(cb.owner) == TrTimer:
dep_topics = []
elif cb.owner is None:
continue
else:
raise RuntimeError(
f"Callback owners other than timers/subscriptions cannot be handled: {cb.owner}")
for topic in dep_topics:
if topic not in cbs_subbed_to_topic:
cbs_subbed_to_topic[topic] = set()
cbs_subbed_to_topic[topic].add(cb)
# Find topics the callback publishes to (HEURISTICALLY!)
# For topics published to during the runtime of the callback's instances,
# assume that they are published by the callback
cbs_publishing_topic: Dict[TrTopic, Set[TrCallbackObject]] = {}
p = tqdm(desc="Processing node publications", total=len(nodes_to_cbs))
for node, cbs in nodes_to_cbs.items():
p.update()
if node is None:
continue
for pub in node.publishers:
if any(f in pub.topic_name for f in TOPIC_FILTERS):
continue
pub_cbs = _get_publishing_cbs(cbs, pub)
if pub.topic not in cbs_publishing_topic:
cbs_publishing_topic[pub.topic] = set()
cbs_publishing_topic[pub.topic].update(pub_cbs)
return cbs_subbed_to_topic, cbs_publishing_topic
@dataclass
class LGCallback:
name: str
in_topics: List[TrTopic]
out_topics: List[TrTopic]
def id(self):
return self.name
@dataclass
class LGTrCallback(LGCallback):
cb: TrCallbackObject
sym: TrCallbackSymbol | None
node: TrNode | None
def id(self):
return str(self.cb.id)
@dataclass
class LGHierarchyLevel:
parent: Optional['LGHierarchyLevel']
children: List['LGHierarchyLevel']
name: str
callbacks: List[LGCallback]
@property
def full_name(self):
if self.parent is None:
return f"{self.name}"
return f"{self.parent.full_name}/{self.name}"
@dataclass
class LGEdge:
start: LGCallback
end: LGCallback
@dataclass
class LatencyGraph:
top_node: LGHierarchyLevel
edges: List[LGEdge]
def __init__(self, tr: TrContext):
##################################################
# Annotate nodes with their callbacks
##################################################
# Note that nodes can also be None!
nodes_to_cbs = {}
p = tqdm(desc="Finding CB nodes", total=len(tr.callback_objects))
for cb in tr.callback_objects.values():
p.update()
node = _get_cb_owner_node(cb)
if node not in nodes_to_cbs:
nodes_to_cbs[node] = set()
nodes_to_cbs[node].add(cb)
##################################################
# Find in/out topics for each callback
##################################################
cbs_subbed_to_topic, cbs_publishing_topic = _get_cb_topic_deps(nodes_to_cbs)
##################################################
# Map topics to their messages
##################################################
topics_to_messages = {}
p = tqdm(desc="Mapping messages to topics", total=len(tr.publish_instances))
for pub_inst in tr.publish_instances:
p.update()
try:
topic = pub_inst.publisher.topic
except KeyError:
continue
if topic not in topics_to_messages:
topics_to_messages[topic] = []
topics_to_messages[topic].append(pub_inst)
##################################################
# Define nodes and edges on lowest level
##################################################
input = LGCallback("INPUT", [], [topic for topic in tr.topics.values() if not topic.publishers])
output = LGCallback("OUTPUT", [topic for topic in tr.topics.values() if not topic.subscriptions], [])
in_node = LGHierarchyLevel(None, [], "INPUT", [input])
out_node = LGHierarchyLevel(None, [], "OUTPUT", [output])
lg_nodes = [in_node, out_node]
tr_to_lg_cb = {}
p = tqdm("Building graph nodes", total=sum(map(len, nodes_to_cbs.values())))
for node, cbs in nodes_to_cbs.items():
node_callbacks = []
for cb in cbs:
p.update()
try:
sym = cb.callback_symbol
pretty_sym = sanitize(sym.symbol)
except KeyError:
sym = None
pretty_sym = cb.id
in_topics = [topic for topic, cbs in cbs_subbed_to_topic.items() if cb in cbs]
out_topics = [topic for topic, cbs in cbs_publishing_topic.items() if cb in cbs]
lg_cb = LGTrCallback(pretty_sym, in_topics, out_topics, cb, sym, node)
node_callbacks.append(lg_cb)
tr_to_lg_cb[cb] = lg_cb
lg_node = LGHierarchyLevel(None, [], node.path if node else "[NONE]", node_callbacks)
lg_nodes.append(lg_node)
edges = []
p = tqdm("Building graph edges", total=len(tr.topics))
for topic in tr.topics.values():
p.update()
sub_cbs = cbs_subbed_to_topic[topic] if topic in cbs_subbed_to_topic else []
pub_cbs = cbs_publishing_topic[topic] if topic in cbs_publishing_topic else []
for sub_cb in sub_cbs:
for pub_cb in pub_cbs:
lg_edge = LGEdge(tr_to_lg_cb[pub_cb], tr_to_lg_cb[sub_cb])
edges.append(lg_edge)
self.edges = edges
##################################################
# Nodes into hierarchy levels
##################################################
self.top_node = _hierarchize(lg_nodes)
def to_gv(self):
pass