dataflow-analysis/latency_graph/latency_graph.py

290 lines
9.8 KiB
Python
Raw Normal View History

from bisect import bisect_left, bisect
from dataclasses import dataclass
from itertools import combinations
from multiprocessing import Pool
from typing import Optional, Set, List, Iterable, Dict, Tuple
import numpy as np
2022-09-15 16:17:24 +02:00
from tqdm import tqdm
from tqdm.contrib import concurrent
from matching.subscriptions import sanitize
from tracing_interop.tr_types import TrContext, TrCallbackObject, TrCallbackSymbol, TrNode, TrPublisher, TrSubscription, \
TrTimer, TrPublishInstance, TrSubscriptionObject, TrTopic, TrCallbackInstance, Timestamp
TOPIC_FILTERS = ["/parameter_events", "/tf_static", "/robot_description", "diagnostics", "/rosout"]
def _get_cb_owner_node(cb: TrCallbackObject) -> TrNode | None:
match cb.owner:
case TrTimer(node=node):
owner_node = node
case TrSubscriptionObject(subscription=sub):
owner_node = sub.node
case _:
owner_node = None
if not owner_node:
print("[WARN] CB has no owners")
return None
return owner_node
def _hierarchize(lg_nodes: Iterable['LGHierarchyLevel']):
base = LGHierarchyLevel(None, [], "", [])
def _insert(parent, node, path):
match path:
case []:
parent.children.append(node)
node.parent = parent
case [head, *tail]:
next_node = next(iter(n for n in parent.children if n.name == head), None)
if next_node is None:
next_node = LGHierarchyLevel(parent, [], head, [])
parent.children.append(next_node)
_insert(next_node, node, tail)
for node in lg_nodes:
path = node.name.strip("/").split("/")
node.name = path[-1]
_insert(base, node, path[:-1])
return base
def inst_runtime_interval(cb_inst: TrCallbackInstance):
start_time = cb_inst.timestamp
end_time = start_time + cb_inst.duration
return start_time, end_time
def _get_publishing_cbs(cbs: Set[TrCallbackObject], pub: TrPublisher):
"""
Counts number of publication instances that lie within one of the cb_intervals.
"""
pub_insts = pub.instances
pub_cb_overlaps = {i: set() for i in range(len(pub_insts))}
for cb in cbs:
cb_intervals = map(inst_runtime_interval, cb.callback_instances)
for t_start, t_end in cb_intervals:
i_overlap_begin = bisect_left(pub_insts, t_start, key=lambda x: x.timestamp)
i_overlap_end = bisect(pub_insts, t_end, key=lambda x: x.timestamp)
for i in range(i_overlap_begin, i_overlap_end):
pub_cb_overlaps[i].add(cb)
pub_cbs = set()
cb_cb_overlaps = set()
for i, i_cbs in pub_cb_overlaps.items():
if not i_cbs:
print(f"[WARN] Publication on {pub.topic_name} without corresponding callback!")
elif len(i_cbs) == 1:
pub_cbs.update(i_cbs)
else: # Multiple CBs in i_cbs
cb_cb_overlaps.update(iter(combinations(i_cbs, 2)))
for cb1, cb2 in cb_cb_overlaps:
cb1_subset_of_cb2 = True
cb2_subset_of_cb1 = True
for i_cbs in pub_cb_overlaps.values():
if cb1 in i_cbs and cb2 not in i_cbs:
cb1_subset_of_cb2 = False
if cb2 in i_cbs and cb1 not in i_cbs:
cb2_subset_of_cb1 = False
if cb1_subset_of_cb2 and cb2_subset_of_cb1:
print(f"[WARN] Callbacks {cb1.id} and {cb2.id} always run in parallel")
elif cb1_subset_of_cb2:
pub_cbs.discard(cb1)
elif cb2_subset_of_cb1:
pub_cbs.discard(cb2)
# else: discard none of them
return pub_cbs
def _get_cb_topic_deps(nodes_to_cbs: Dict[TrNode, Set[TrCallbackObject]]):
cbs_subbed_to_topic: Dict[TrTopic, Set[TrCallbackObject]] = {}
# Find topics the callback EXPLICITLY depends on
# - Timer callbacks: no EXPLICIT dependencies
# - Subscription callbacks: CB depends on the subscribed topic. Possibly also has other IMPLICIT dependencies
p = tqdm(desc="Processing CB subscriptions", total=sum(map(len, nodes_to_cbs.values())))
for node, cbs in nodes_to_cbs.items():
for cb in cbs:
p.update()
if type(cb.owner) == TrSubscriptionObject:
dep_topics = [cb.owner.subscription.topic]
elif type(cb.owner) == TrTimer:
dep_topics = []
elif cb.owner is None:
continue
else:
print(f" [WARN] Callback owners other than timers/subscriptions cannot be handled: {cb.owner}")
continue
for topic in dep_topics:
if topic not in cbs_subbed_to_topic:
cbs_subbed_to_topic[topic] = set()
cbs_subbed_to_topic[topic].add(cb)
# Find topics the callback publishes to (HEURISTICALLY!)
# For topics published to during the runtime of the callback's instances,
# assume that they are published by the callback
cbs_publishing_topic: Dict[TrTopic, Set[TrCallbackObject]] = {}
cb_publishers: Dict[TrCallbackObject, Set[TrPublisher]] = {}
for node, cbs in tqdm(nodes_to_cbs.items(), desc="Processing node publications"):
if node is None:
continue
for pub in node.publishers:
if any(f in pub.topic_name for f in TOPIC_FILTERS):
continue
pub_cbs = _get_publishing_cbs(cbs, pub)
for cb in pub_cbs:
if cb not in cb_publishers:
cb_publishers[cb] = set()
cb_publishers[cb].add(pub)
if pub.topic not in cbs_publishing_topic:
cbs_publishing_topic[pub.topic] = set()
cbs_publishing_topic[pub.topic].update(pub_cbs)
return cbs_subbed_to_topic, cbs_publishing_topic, cb_publishers
@dataclass
class LGCallback:
name: str
in_topics: List[TrTopic]
out_topics: List[TrTopic]
def id(self):
return self.name
@dataclass
class LGTrCallback(LGCallback):
cb: TrCallbackObject
sym: TrCallbackSymbol | None
node: TrNode | None
def id(self):
return str(self.cb.id)
@dataclass
class LGHierarchyLevel:
parent: Optional['LGHierarchyLevel']
children: List['LGHierarchyLevel']
name: str
callbacks: List[LGCallback]
@property
def full_name(self):
if self.parent is None:
return f"{self.name}"
return f"{self.parent.full_name}/{self.name}"
@dataclass
class LGEdge:
start: LGCallback
end: LGCallback
topic: TrTopic
@dataclass
class LatencyGraph:
top_node: LGHierarchyLevel
edges: List[LGEdge]
cb_pubs: Dict[TrCallbackObject, Set[TrPublisher]]
pub_cbs: Dict[TrPublisher, Set[TrCallbackObject]]
def __init__(self, tr: TrContext):
##################################################
# Annotate nodes with their callbacks
##################################################
# Note that nodes can also be None!
nodes_to_cbs = {}
for cb in tqdm(tr.callback_objects, desc="Finding CB nodes"):
node = _get_cb_owner_node(cb)
if node not in nodes_to_cbs:
nodes_to_cbs[node] = set()
nodes_to_cbs[node].add(cb)
##################################################
# Find in/out topics for each callback
##################################################
cbs_subbed_to_topic, cbs_publishing_topic, cb_pubs = _get_cb_topic_deps(nodes_to_cbs)
pub_cbs = {}
for cb, pubs in cb_pubs.items():
for pub in pubs:
if pub not in pub_cbs:
pub_cbs[pub] = set()
pub_cbs[pub].add(cb)
self.cb_pubs = cb_pubs
self.pub_cbs = pub_cbs
##################################################
# Define nodes and edges on lowest level
##################################################
input = LGCallback("INPUT", [], [topic for topic in tr.topics if not topic.publishers])
output = LGCallback("OUTPUT", [topic for topic in tr.topics if not topic.subscriptions], [])
in_node = LGHierarchyLevel(None, [], "INPUT", [input])
out_node = LGHierarchyLevel(None, [], "OUTPUT", [output])
lg_nodes = [in_node, out_node]
tr_to_lg_cb = {}
p = tqdm(desc="Building graph nodes", total=sum(map(len, nodes_to_cbs.values())))
for node, cbs in nodes_to_cbs.items():
node_callbacks = []
for cb in cbs:
p.update()
sym = cb.callback_symbol
if sym is not None:
pretty_sym = sanitize(sym.symbol)
else:
pretty_sym = cb.id
in_topics = [topic for topic, cbs in cbs_subbed_to_topic.items() if cb in cbs]
out_topics = [topic for topic, cbs in cbs_publishing_topic.items() if cb in cbs]
lg_cb = LGTrCallback(pretty_sym, in_topics, out_topics, cb, sym, node)
node_callbacks.append(lg_cb)
tr_to_lg_cb[cb] = lg_cb
lg_node = LGHierarchyLevel(None, [], node.path if node else "[NONE]", node_callbacks)
lg_nodes.append(lg_node)
edges = []
for topic in tqdm(tr.topics, desc="Building graph edges"):
sub_cbs = cbs_subbed_to_topic[topic] if topic in cbs_subbed_to_topic else []
pub_cbs = cbs_publishing_topic[topic] if topic in cbs_publishing_topic else []
for sub_cb in sub_cbs:
for pub_cb in pub_cbs:
lg_edge = LGEdge(tr_to_lg_cb[pub_cb], tr_to_lg_cb[sub_cb], topic)
edges.append(lg_edge)
self.edges = edges
##################################################
# Nodes into hierarchy levels
##################################################
self.top_node = _hierarchize(lg_nodes)