E2E calculation per output message, plotting of E2E over time, E2E breakdown charts, E2E message flow charts

This commit is contained in:
Maximilian Schmeller 2022-08-29 22:17:52 +02:00
parent 241a7c3bf2
commit 8eee45c79a
7 changed files with 1212 additions and 339 deletions

View file

@ -1,58 +1,34 @@
from bisect import bisect_left, bisect
from dataclasses import dataclass from dataclasses import dataclass
from itertools import combinations from itertools import combinations
from multiprocessing import Pool from multiprocessing import Pool
from typing import Optional, Set, List, Iterable, Dict, Tuple from typing import Optional, Set, List, Iterable, Dict, Tuple
import numpy as np
from tqdm.notebook import tqdm from tqdm.notebook import tqdm
from tqdm.contrib import concurrent from tqdm.contrib import concurrent
from matching.subscriptions import sanitize from matching.subscriptions import sanitize
from tracing_interop.tr_types import TrContext, TrCallbackObject, TrCallbackSymbol, TrNode, TrPublisher, TrSubscription, \ from tracing_interop.tr_types import TrContext, TrCallbackObject, TrCallbackSymbol, TrNode, TrPublisher, TrSubscription, \
TrTimer, TrPublishInstance, TrSubscriptionObject, TrTopic, TrCallbackInstance TrTimer, TrPublishInstance, TrSubscriptionObject, TrTopic, TrCallbackInstance, Timestamp
TOPIC_FILTERS = ["/parameter_events", "/tf_static", "/robot_description", "diagnostics", "/rosout"]
TOPIC_FILTERS = ["/parameter_events", "/tf_static", "/robot_description", "diagnostics"]
def _map_cb_times(args):
cb_id, inst_times, pub_timestamps = args
pub_cb_overlaps = {i: set() for i in range(len(pub_timestamps))}
inst_times.sort(key=lambda tup: tup[0]) # tup[0] is start time
inst_iter = iter(inst_times)
pub_iter = iter(enumerate(pub_timestamps))
inst_start, inst_end = next(inst_iter, (None, None))
i, t = next(pub_iter, (None, None))
while inst_start is not None and i is not None:
if inst_start <= t <= inst_end:
pub_cb_overlaps[i].add(cb_id)
if t <= inst_end:
i, t = next(pub_iter, (None, None))
else:
inst_start, inst_end = next(inst_iter, (None, None))
return pub_cb_overlaps
def _get_cb_owner_node(cb: TrCallbackObject) -> TrNode | None: def _get_cb_owner_node(cb: TrCallbackObject) -> TrNode | None:
match cb.owner: match cb.owner:
case TrTimer(nodes=nodes): case TrTimer(node=node):
owner_nodes = nodes owner_node = node
case TrSubscriptionObject(subscription=sub): case TrSubscriptionObject(subscription=sub):
owner_nodes = [sub.node] owner_node = sub.node
case _: case _:
owner_nodes = [] owner_node = None
if len(owner_nodes) > 1: if not owner_node:
raise RuntimeError(f"CB has owners {', '.join(map(lambda n: n.path, owner_nodes))}")
elif not owner_nodes:
print("[WARN] CB has no owners") print("[WARN] CB has no owners")
return None return None
return owner_nodes[0] return owner_node
def _hierarchize(lg_nodes: Iterable['LGHierarchyLevel']): def _hierarchize(lg_nodes: Iterable['LGHierarchyLevel']):
@ -79,33 +55,25 @@ def _hierarchize(lg_nodes: Iterable['LGHierarchyLevel']):
def inst_runtime_interval(cb_inst: TrCallbackInstance): def inst_runtime_interval(cb_inst: TrCallbackInstance):
inst_t_min = cb_inst.timestamp.timestamp() start_time = cb_inst.timestamp
inst_t_max = inst_t_min + cb_inst.duration.total_seconds() end_time = start_time + cb_inst.duration
return inst_t_min, inst_t_max return start_time, end_time
def _get_publishing_cbs(cbs: Set[TrCallbackObject], pub: TrPublisher): def _get_publishing_cbs(cbs: Set[TrCallbackObject], pub: TrPublisher):
""" """
Counts number of publication instances that lie within one of the cb_intervals. Counts number of publication instances that lie within one of the cb_intervals.
""" """
pub_timestamps = [inst.timestamp * 1e-9 for inst in pub.instances] pub_insts = pub.instances
pub_cb_overlaps = {i: set() for i in range(len(pub_insts))}
# Algorithm: Two-pointer method for cb in cbs:
# With both the pub_timestamps and cb_intervals sorted ascending, cb_intervals = map(inst_runtime_interval, cb.callback_instances)
# we can cut down the O(m*n) comparisons to O(m+n). for t_start, t_end in cb_intervals:
pub_timestamps.sort() i_overlap_begin = bisect_left(pub_insts, t_start, key=lambda x: x.timestamp)
i_overlap_end = bisect(pub_insts, t_end, key=lambda x: x.timestamp)
cb_id_to_cb = {cb.id: cb for cb in cbs} for i in range(i_overlap_begin, i_overlap_end):
_map_args = [(cb.id, [inst_runtime_interval(inst) for inst in cb.callback_instances], pub_timestamps) for cb in cbs] pub_cb_overlaps[i].add(cb)
with Pool() as p:
cb_wise_overlaps = p.map(_map_cb_times, _map_args)
pub_cb_overlaps = {i: set() for i in range(len(pub_timestamps))}
for overlap_dict in cb_wise_overlaps:
for i, cb_ids in overlap_dict.items():
cbs = [cb_id_to_cb[cb_id] for cb_id in cb_ids]
pub_cb_overlaps[i].update(cbs)
pub_cbs = set() pub_cbs = set()
cb_cb_overlaps = set() cb_cb_overlaps = set()
@ -168,21 +136,24 @@ def _get_cb_topic_deps(nodes_to_cbs: Dict[TrNode, Set[TrCallbackObject]]):
# For topics published to during the runtime of the callback's instances, # For topics published to during the runtime of the callback's instances,
# assume that they are published by the callback # assume that they are published by the callback
cbs_publishing_topic: Dict[TrTopic, Set[TrCallbackObject]] = {} cbs_publishing_topic: Dict[TrTopic, Set[TrCallbackObject]] = {}
p = tqdm(desc="Processing node publications", total=len(nodes_to_cbs)) cb_publishers: Dict[TrCallbackObject, Set[TrPublisher]] = {}
for node, cbs in nodes_to_cbs.items(): for node, cbs in tqdm(nodes_to_cbs.items(), desc="Processing node publications"):
p.update()
if node is None: if node is None:
continue continue
for pub in node.publishers: for pub in node.publishers:
if any(f in pub.topic_name for f in TOPIC_FILTERS): if any(f in pub.topic_name for f in TOPIC_FILTERS):
continue continue
pub_cbs = _get_publishing_cbs(cbs, pub) pub_cbs = _get_publishing_cbs(cbs, pub)
for cb in pub_cbs:
if cb not in cb_publishers:
cb_publishers[cb] = set()
cb_publishers[cb].add(pub)
if pub.topic not in cbs_publishing_topic: if pub.topic not in cbs_publishing_topic:
cbs_publishing_topic[pub.topic] = set() cbs_publishing_topic[pub.topic] = set()
cbs_publishing_topic[pub.topic].update(pub_cbs) cbs_publishing_topic[pub.topic].update(pub_cbs)
return cbs_subbed_to_topic, cbs_publishing_topic return cbs_subbed_to_topic, cbs_publishing_topic, cb_publishers
@dataclass @dataclass
@ -224,6 +195,7 @@ class LGHierarchyLevel:
class LGEdge: class LGEdge:
start: LGCallback start: LGCallback
end: LGCallback end: LGCallback
topic: TrTopic
@dataclass @dataclass
@ -231,6 +203,9 @@ class LatencyGraph:
top_node: LGHierarchyLevel top_node: LGHierarchyLevel
edges: List[LGEdge] edges: List[LGEdge]
cb_pubs: Dict[TrCallbackObject, Set[TrPublisher]]
pub_cbs: Dict[TrPublisher, Set[TrCallbackObject]]
def __init__(self, tr: TrContext): def __init__(self, tr: TrContext):
################################################## ##################################################
# Annotate nodes with their callbacks # Annotate nodes with their callbacks
@ -238,9 +213,7 @@ class LatencyGraph:
# Note that nodes can also be None! # Note that nodes can also be None!
nodes_to_cbs = {} nodes_to_cbs = {}
p = tqdm(desc="Finding CB nodes", total=len(tr.callback_objects)) for cb in tqdm(tr.callback_objects, desc="Finding CB nodes"):
for cb in tr.callback_objects.values():
p.update()
node = _get_cb_owner_node(cb) node = _get_cb_owner_node(cb)
if node not in nodes_to_cbs: if node not in nodes_to_cbs:
@ -251,31 +224,23 @@ class LatencyGraph:
# Find in/out topics for each callback # Find in/out topics for each callback
################################################## ##################################################
cbs_subbed_to_topic, cbs_publishing_topic = _get_cb_topic_deps(nodes_to_cbs) cbs_subbed_to_topic, cbs_publishing_topic, cb_pubs = _get_cb_topic_deps(nodes_to_cbs)
pub_cbs = {}
for cb, pubs in cb_pubs.items():
for pub in pubs:
if pub not in pub_cbs:
pub_cbs[pub] = set()
pub_cbs[pub].add(cb)
################################################## self.cb_pubs = cb_pubs
# Map topics to their messages self.pub_cbs = pub_cbs
##################################################
topics_to_messages = {}
p = tqdm(desc="Mapping messages to topics", total=len(tr.publish_instances))
for pub_inst in tr.publish_instances:
p.update()
try:
topic = pub_inst.publisher.topic
except KeyError:
continue
if topic not in topics_to_messages:
topics_to_messages[topic] = []
topics_to_messages[topic].append(pub_inst)
################################################## ##################################################
# Define nodes and edges on lowest level # Define nodes and edges on lowest level
################################################## ##################################################
input = LGCallback("INPUT", [], [topic for topic in tr.topics.values() if not topic.publishers]) input = LGCallback("INPUT", [], [topic for topic in tr.topics if not topic.publishers])
output = LGCallback("OUTPUT", [topic for topic in tr.topics.values() if not topic.subscriptions], []) output = LGCallback("OUTPUT", [topic for topic in tr.topics if not topic.subscriptions], [])
in_node = LGHierarchyLevel(None, [], "INPUT", [input]) in_node = LGHierarchyLevel(None, [], "INPUT", [input])
out_node = LGHierarchyLevel(None, [], "OUTPUT", [output]) out_node = LGHierarchyLevel(None, [], "OUTPUT", [output])
@ -284,17 +249,17 @@ class LatencyGraph:
tr_to_lg_cb = {} tr_to_lg_cb = {}
p = tqdm("Building graph nodes", total=sum(map(len, nodes_to_cbs.values()))) p = tqdm(desc="Building graph nodes", total=sum(map(len, nodes_to_cbs.values())))
for node, cbs in nodes_to_cbs.items(): for node, cbs in nodes_to_cbs.items():
node_callbacks = [] node_callbacks = []
for cb in cbs: for cb in cbs:
p.update() p.update()
try:
sym = cb.callback_symbol sym = cb.callback_symbol
if sym is not None:
pretty_sym = sanitize(sym.symbol) pretty_sym = sanitize(sym.symbol)
except KeyError: else:
sym = None
pretty_sym = cb.id pretty_sym = cb.id
in_topics = [topic for topic, cbs in cbs_subbed_to_topic.items() if cb in cbs] in_topics = [topic for topic, cbs in cbs_subbed_to_topic.items() if cb in cbs]
out_topics = [topic for topic, cbs in cbs_publishing_topic.items() if cb in cbs] out_topics = [topic for topic, cbs in cbs_publishing_topic.items() if cb in cbs]
@ -306,15 +271,13 @@ class LatencyGraph:
lg_nodes.append(lg_node) lg_nodes.append(lg_node)
edges = [] edges = []
p = tqdm("Building graph edges", total=len(tr.topics)) for topic in tqdm(tr.topics, desc="Building graph edges"):
for topic in tr.topics.values():
p.update()
sub_cbs = cbs_subbed_to_topic[topic] if topic in cbs_subbed_to_topic else [] sub_cbs = cbs_subbed_to_topic[topic] if topic in cbs_subbed_to_topic else []
pub_cbs = cbs_publishing_topic[topic] if topic in cbs_publishing_topic else [] pub_cbs = cbs_publishing_topic[topic] if topic in cbs_publishing_topic else []
for sub_cb in sub_cbs: for sub_cb in sub_cbs:
for pub_cb in pub_cbs: for pub_cb in pub_cbs:
lg_edge = LGEdge(tr_to_lg_cb[pub_cb], tr_to_lg_cb[sub_cb]) lg_edge = LGEdge(tr_to_lg_cb[pub_cb], tr_to_lg_cb[sub_cb], topic)
edges.append(lg_edge) edges.append(lg_edge)
self.edges = edges self.edges = edges
@ -324,6 +287,3 @@ class LatencyGraph:
################################################## ##################################################
self.top_node = _hierarchize(lg_nodes) self.top_node = _hierarchize(lg_nodes)
def to_gv(self):
pass

View file

@ -0,0 +1,31 @@
from dataclasses import dataclass
from typing import List
from tracing_interop.tr_types import TrPublishInstance, TrCallbackInstance
@dataclass
class DepTree:
head: TrCallbackInstance | TrPublishInstance
deps: List['DepTree']
def depth(self):
return 1 + max(map(DepTree.depth, self.deps), default=0)
def size(self):
return 1 + sum(map(DepTree.size, self.deps))
def fanout(self):
if not self.deps:
return 1
return sum(map(DepTree.fanout, self.deps))
def e2e_lat(self):
return self.head.timestamp - self.critical_path()[-1].timestamp
def critical_path(self):
if not self.deps:
return [self.head]
return [self.head, *min(map(DepTree.critical_path, self.deps), key=lambda ls: ls[-1].timestamp)]

View file

@ -73,7 +73,7 @@ def cached(name, function, file_deps: List[str]):
if pkl_time > dep_time: if pkl_time > dep_time:
with open(pkl_filename, "rb") as f: with open(pkl_filename, "rb") as f:
print(f"[CACHE] Found up-to-date cache entry for {name}, loading.") print(f"[CACHE] Found up-to-date cache entry ({pkl_filename}) for {name}, loading.")
return pickle.load(f) return pickle.load(f)
if os.path.exists(pkl_filename): if os.path.exists(pkl_filename):

View file

@ -4,4 +4,5 @@ matplotlib
pyvis pyvis
graphviz graphviz
ruamel.yaml ruamel.yaml
fuzzywuzzy blist @ git+https://github.com/mojomex/blist.git@47724cbc4137ddfb685f9711e950fb82587bf971

File diff suppressed because it is too large Load diff

View file

@ -1,100 +1,108 @@
from collections import namedtuple, UserList
from dataclasses import dataclass, field from dataclasses import dataclass, field
from functools import cached_property from typing import List, Dict, Optional, Set, TypeVar, Generic, Iterable
from typing import List, Dict import bisect
import pandas as pd
from tqdm.notebook import tqdm
from tracetools_analysis.processor.ros2 import Ros2Handler from tracetools_analysis.processor.ros2 import Ros2Handler
from tracetools_analysis.utils.ros2 import Ros2DataModelUtil
from .utils import list_to_dict, df_to_type_list from .utils import df_to_type_list
IdxItemType = TypeVar("IdxItemType")
Timestamp = namedtuple("Timestamp", ["timestamp"])
class Index(Generic[IdxItemType]):
def __init__(self, items: Iterable[IdxItemType], **idx_fields):
sort_key = lambda item: item.timestamp
self.__items = list(items)
self.__items.sort(key=sort_key)
self.__indices = {}
for idx_name, is_multi in idx_fields.items():
index = {}
self.__indices[idx_name] = index
if is_multi:
for item in self.__items:
key = getattr(item, idx_name)
if key not in index:
index[key] = []
index[key].append(item) # Also sorted since items are processed in order and only filtered here
else:
for item in self.__items:
key = getattr(item, idx_name)
if key in index:
print(repr(ValueError(f"Duplicate key: {idx_name}={key}; old={index[key]}; new={item}")))
index[key] = item
def __iter__(self):
return iter(self.__items)
def __len__(self):
return len(self.__items)
def __getattr__(self, item: str):
if not item.startswith("by_"):
return AttributeError(
f"Not found in index: '{item}'. Index lookups must be of the shape 'by_<index_field>'.")
return self.__indices[item.removeprefix("by_")]
def __getstate__(self):
return vars(self)
def __setstate__(self, state):
vars(self).update(state)
@dataclass @dataclass
class TrContext: class TrContext:
nodes: Dict[int, 'TrNode'] nodes: Index['TrNode']
publishers: Dict[int, 'TrPublisher'] publishers: Index['TrPublisher']
subscriptions: Dict[int, 'TrSubscription'] subscriptions: Index['TrSubscription']
timers: Dict[int, 'TrTimer'] timers: Index['TrTimer']
timer_node_links: Dict[int, 'TrTimerNodeLink'] timer_node_links: Index['TrTimerNodeLink']
subscription_objects: Dict[int, 'TrSubscriptionObject'] subscription_objects: Index['TrSubscriptionObject']
callback_objects: Dict[int, 'TrCallbackObject'] callback_objects: Index['TrCallbackObject']
callback_symbols: Dict[int, 'TrCallbackSymbol'] callback_symbols: Index['TrCallbackSymbol']
publish_instances: List['TrPublishInstance'] publish_instances: Index['TrPublishInstance']
callback_instances: List['TrCallbackInstance'] callback_instances: Index['TrCallbackInstance']
topics: Dict[str, 'TrTopic'] topics: Index['TrTopic']
util: Ros2DataModelUtil | None
handler: Ros2Handler | None
def __init__(self, util: Ros2DataModelUtil, handler: Ros2Handler):
self.util = util
self.handler = handler
def __init__(self, handler: Ros2Handler):
print("[TrContext] Processing ROS 2 objects from traces...") print("[TrContext] Processing ROS 2 objects from traces...")
self.nodes = list_to_dict(df_to_type_list(handler.data.nodes, TrNode, _c=self)) self.nodes = Index(df_to_type_list(handler.data.nodes, TrNode, _c=self),
print(f" ├─ Processed {len(self.nodes):<8d} nodes") id=False)
self.publishers = list_to_dict(df_to_type_list(handler.data.rcl_publishers, TrPublisher, _c=self)) self.publishers = Index(df_to_type_list(handler.data.rcl_publishers, TrPublisher, _c=self),
print(f" ├─ Processed {len(self.publishers):<8d} publishers") id=False, node_handle=True, topic_name=True)
self.subscriptions = list_to_dict(df_to_type_list(handler.data.rcl_subscriptions, TrSubscription, _c=self)) self.subscriptions = Index(df_to_type_list(handler.data.rcl_subscriptions, TrSubscription, _c=self),
print(f" ├─ Processed {len(self.subscriptions):<8d} subscriptions") id=False, node_handle=True, topic_name=True)
self.timers = list_to_dict(df_to_type_list(handler.data.timers, TrTimer, _c=self)) self.timers = Index(df_to_type_list(handler.data.timers, TrTimer, _c=self),
print(f" ├─ Processed {len(self.timers):<8d} timers") id=False)
self.timer_node_links = list_to_dict(df_to_type_list(handler.data.timer_node_links, TrTimerNodeLink)) self.timer_node_links = Index(df_to_type_list(handler.data.timer_node_links, TrTimerNodeLink),
print(f" ├─ Processed {len(self.timer_node_links):<8d} timer-node links") id=False, node_handle=True)
self.subscription_objects = list_to_dict( self.subscription_objects = Index(
df_to_type_list(handler.data.subscription_objects, TrSubscriptionObject, _c=self)) df_to_type_list(handler.data.subscription_objects, TrSubscriptionObject, _c=self),
print(f" ├─ Processed {len(self.subscription_objects):<8d} subscription objects") id=False, subscription_handle=False)
self.callback_objects = list_to_dict(df_to_type_list(handler.data.callback_objects, TrCallbackObject, _c=self)) self.callback_objects = Index(df_to_type_list(handler.data.callback_objects, TrCallbackObject, _c=self),
print(f" ├─ Processed {len(self.callback_objects):<8d} callback objects") id=False, callback_object=False)
self.callback_symbols = list_to_dict(df_to_type_list(handler.data.callback_symbols, TrCallbackSymbol, _c=self)) self.callback_symbols = Index(df_to_type_list(handler.data.callback_symbols, TrCallbackSymbol, _c=self),
print(f" ├─ Processed {len(self.callback_symbols):<8d} callback symbols") id=False)
self.publish_instances = df_to_type_list(handler.data.rcl_publish_instances, TrPublishInstance, _c=self) self.publish_instances = Index(df_to_type_list(handler.data.rcl_publish_instances, TrPublishInstance, _c=self,
print(f" ├─ Processed {len(self.publish_instances):<8d} publish instances") mappers={"timestamp": lambda t: t * 1e-9}),
self.callback_instances = df_to_type_list(handler.data.callback_instances, TrCallbackInstance, _c=self) publisher_handle=True)
print(f" ├─ Processed {len(self.callback_instances):<8d} callback instances") self.callback_instances = Index(df_to_type_list(handler.data.callback_instances, TrCallbackInstance, _c=self,
mappers={"timestamp": lambda t: t.timestamp(),
"duration": lambda d: d.total_seconds()}),
callback_object=True)
_unique_topic_names = {*(pub.topic_name for pub in self.publishers.values()), _unique_topic_names = {*(pub.topic_name for pub in self.publishers),
*(sub.topic_name for sub in self.subscriptions.values())} *(sub.topic_name for sub in self.subscriptions)}
self.topics = list_to_dict(map(lambda name: TrTopic(name=name, _c=self), _unique_topic_names), key="name")
print(f" └─ Processed {len(self.topics):<8d} topics\n")
print("[TrContext] Caching dynamic properties...") self.topics = Index((TrTopic(name=name, _c=self) for name in _unique_topic_names),
name=False)
p = tqdm(desc=" ├─ Processing nodes", total=len(self.nodes.values()))
[(o.path, o.publishers, o.subscriptions, o.timers, p.update()) for o in self.nodes.values()]
print(" ├─ Cached node properties")
p = tqdm(desc=" ├─ Processing publishers", total=len(self.publishers.values()))
[(o.instances, o.subscriptions, p.update()) for o in self.publishers.values()]
print(" ├─ Cached publisher properties")
p = tqdm(desc=" ├─ Processing subscriptions", total=len(self.subscriptions.values()))
[(o.publishers, o.subscription_objects, p.update()) for o in self.subscriptions.values()]
print(" ├─ Cached subscription properties")
p = tqdm(desc=" ├─ Processing timers", total=len(self.timers.values()))
[(o.nodes, p.update()) for o in self.timers.values()]
print(" ├─ Cached timer properties")
p = tqdm(desc=" ├─ Processing CB objects", total=len(self.callback_objects.values()))
[(o.callback_instances, o.owner, p.update()) for o in self.callback_objects.values()]
print(" ├─ Cached callback object properties")
p = tqdm(desc=" ├─ Processing CB symbols", total=len(self.callback_symbols.values()))
[(o.callback_objs, p.update()) for o in self.callback_symbols.values()]
print(" ├─ Cached callback symbol properties")
p = tqdm(desc=" ├─ Processing topics", total=len(self.topics.values()))
[(o.publishers, o.subscriptions, p.update()) for o in self.topics.values()]
print(" └─ Cached topic properties\n")
def __getstate__(self):
state = self.__dict__.copy()
del state["util"]
del state["handler"]
return state
def __setstate__(self, state):
self.__dict__.update(state)
self.util = None
self.handler = None
def __repr__(self): def __repr__(self):
return f"TrContext" return f"TrContext"
@ -110,26 +118,30 @@ class TrNode:
namespace: str namespace: str
_c: TrContext = field(repr=False) _c: TrContext = field(repr=False)
@cached_property @property
def path(self) -> str: def path(self) -> str:
return '/'.join((self.namespace, self.name)).replace('//', '/') return '/'.join((self.namespace, self.name)).replace('//', '/')
@cached_property @property
def publishers(self) -> List['TrPublisher']: def publishers(self) -> List['TrPublisher']:
return list(filter(lambda pub: pub.node_handle == self.id, self._c.publishers.values())) return self._c.publishers.by_node_handle.get(self.id) or []
@cached_property @property
def subscriptions(self) -> List['TrSubscription']: def subscriptions(self) -> List['TrSubscription']:
return list(filter(lambda sub: sub.node_handle == self.id, self._c.subscriptions.values())) return self._c.subscriptions.by_node_handle.get(self.id) or []
@cached_property @property
def timers(self) -> List['TrTimer']: def timers(self) -> List['TrTimer']:
links = [link.id for link in self._c.timer_node_links.values() if link.node_handle == self.id] links = self._c.timer_node_links.by_node_handle.get(self.id) or []
return list(filter(lambda timer: timer.id in links, self._c.timers.values())) timers = [self._c.timers.by_id.get(link.id) for link in links]
return [t for t in timers if t is not None]
def __hash__(self): def __hash__(self):
return hash(self.id) return hash(self.id)
def __eq__(self, other):
return self.__hash__() == other.__hash__()
@dataclass @dataclass
class TrPublisher: class TrPublisher:
@ -142,24 +154,27 @@ class TrPublisher:
_c: TrContext = field(repr=False) _c: TrContext = field(repr=False)
@property @property
def node(self) -> 'TrNode': def node(self) -> Optional['TrNode']:
return self._c.nodes[self.node_handle] return self._c.nodes.by_id.get(self.node_handle)
@cached_property
def subscriptions(self) -> List['TrSubscription']:
return list(filter(lambda sub: sub.topic_name == self.topic_name, self._c.subscriptions.values()))
@cached_property
def instances(self) -> List['TrPublishInstance']:
return list(filter(lambda inst: inst.publisher_handle == self.id, self._c.publish_instances))
@property @property
def topic(self) -> 'TrTopic': def subscriptions(self) -> List['TrSubscription']:
return self._c.topics[self.topic_name] return self._c.subscriptions.by_topic_name.get(self.topic_name) or []
@property
def instances(self) -> List['TrPublishInstance']:
return self._c.publish_instances.by_publisher_handle.get(self.id) or []
@property
def topic(self) -> Optional['TrTopic']:
return self._c.topics.by_name.get(self.topic_name)
def __hash__(self): def __hash__(self):
return hash(self.id) return hash(self.id)
def __eq__(self, other):
return self.__hash__() == other.__hash__()
@dataclass @dataclass
class TrSubscription: class TrSubscription:
@ -172,25 +187,27 @@ class TrSubscription:
_c: TrContext = field(repr=False) _c: TrContext = field(repr=False)
@property @property
def node(self) -> 'TrNode': def node(self) -> Optional['TrNode']:
return self._c.nodes[self.node_handle] return self._c.nodes.by_id.get(self.node_handle)
@cached_property
def publishers(self) -> List['TrPublisher']:
return list(filter(lambda pub: pub.topic_name == self.topic_name, self._c.publishers.values()))
@cached_property
def subscription_objects(self) -> List['TrSubscriptionObject']:
return list(
filter(lambda sub_obj: sub_obj.subscription_handle == self.id, self._c.subscription_objects.values()))
@property @property
def topic(self) -> 'TrTopic': def publishers(self) -> List['TrPublisher']:
return self._c.topics[self.topic_name] return self._c.publishers.by_topic_name.get(self.topic_name) or []
@property
def subscription_object(self) -> Optional['TrSubscriptionObject']:
return self._c.subscription_objects.by_subscription_handle.get(self.id)
@property
def topic(self) -> Optional['TrTopic']:
return self._c.topics.by_name.get(self.topic_name)
def __hash__(self): def __hash__(self):
return hash(self.id) return hash(self.id)
def __eq__(self, other):
return self.__hash__() == other.__hash__()
@dataclass @dataclass
class TrTimer: class TrTimer:
@ -200,18 +217,23 @@ class TrTimer:
tid: int tid: int
_c: TrContext = field(repr=False) _c: TrContext = field(repr=False)
@cached_property @property
def nodes(self) -> List['TrNode']: def node(self) -> Optional['TrNode']:
links = [link.node_handle for link in self._c.timer_node_links.values() if link.id == self.id] link = self._c.timer_node_links.by_id.get(self.id)
return list(filter(lambda node: node.id in links, self._c.nodes.values())) if link is None:
return None
return self._c.nodes.by_id.get(link.node_handle)
@property @property
def callback_object(self) -> 'TrCallbackObject': def callback_object(self) -> Optional['TrCallbackObject']:
return self._c.callback_objects[self.id] return self._c.callback_objects.by_id.get(self.id)
def __hash__(self): def __hash__(self):
return hash(self.id) return hash(self.id)
def __eq__(self, other):
return self.__hash__() == other.__hash__()
@dataclass @dataclass
class TrTimerNodeLink: class TrTimerNodeLink:
@ -219,25 +241,34 @@ class TrTimerNodeLink:
timestamp: int timestamp: int
node_handle: int node_handle: int
def __hash__(self):
return hash((self.id, self.node_handle))
def __eq__(self, other):
return self.__hash__() == other.__hash__()
@dataclass @dataclass
class TrSubscriptionObject: class TrSubscriptionObject:
id: int # subscription id: int
timestamp: int timestamp: int
subscription_handle: int subscription_handle: int
_c: TrContext = field(repr=False) _c: TrContext = field(repr=False)
@property @property
def subscription(self) -> 'TrSubscription': def subscription(self) -> Optional['TrSubscription']:
return self._c.subscriptions[self.subscription_handle] return self._c.subscriptions.by_id.get(self.subscription_handle)
@property @property
def callback_object(self) -> 'TrCallbackObject': def callback_object(self) -> Optional['TrCallbackObject']:
return self._c.callback_objects[self.id] return self._c.callback_objects.by_id.get(self.id)
def __hash__(self): def __hash__(self):
return hash((self.id, self.timestamp, self.subscription_handle)) return hash((self.id, self.timestamp, self.subscription_handle))
def __eq__(self, other):
return self.__hash__() == other.__hash__()
@dataclass @dataclass
class TrCallbackObject: class TrCallbackObject:
@ -246,62 +277,67 @@ class TrCallbackObject:
callback_object: int callback_object: int
_c: TrContext = field(repr=False) _c: TrContext = field(repr=False)
@cached_property @property
def callback_instances(self) -> List['TrCallbackInstance']: def callback_instances(self) -> List['TrCallbackInstance']:
return list(filter(lambda inst: inst.callback_object == self.callback_object, self._c.callback_instances)) return self._c.callback_instances.by_callback_object.get(self.callback_object) or []
@property @property
def callback_symbol(self) -> 'TrCallbackSymbol': def callback_symbol(self) -> Optional['TrCallbackSymbol']:
return self._c.callback_symbols[self.id] return self._c.callback_symbols.by_id.get(self.callback_object)
@cached_property @property
def owner(self): def owner(self):
if self.id in self._c.timers: if self.id in self._c.timers.by_id:
return self._c.timers[self.id] return self._c.timers.by_id[self.id]
if self.id in self._c.publishers: if self.id in self._c.publishers.by_id:
return self._c.publishers[self.id] return self._c.publishers.by_id[self.id]
if self.id in self._c.subscription_objects: if self.id in self._c.subscription_objects.by_id:
return self._c.subscription_objects[self.id] return self._c.subscription_objects.by_id[self.id]
if self.id in self._c.handler.data.services.index:
return 'Service'
if self.id in self._c.handler.data.clients.index:
return 'Client'
return None return None
def __hash__(self): def __hash__(self):
return hash((self.id, self.timestamp, self.callback_object)) return hash((self.id, self.timestamp, self.callback_object))
def __eq__(self, other):
return self.__hash__() == other.__hash__()
@dataclass @dataclass
class TrPublishInstance: class TrPublishInstance:
publisher_handle: int publisher_handle: int
timestamp: int timestamp: float
message: int message: int
_c: TrContext = field(repr=False) _c: TrContext = field(repr=False)
@property @property
def publisher(self) -> 'TrPublisher': def publisher(self) -> Optional['TrPublisher']:
return self._c.publishers[self.publisher_handle] return self._c.publishers.by_id.get(self.publisher_handle)
def __hash__(self): def __hash__(self):
return hash((self.publisher_handle, self.timestamp, self.message)) return hash((self.publisher_handle, self.timestamp, self.message))
def __eq__(self, other):
return self.__hash__() == other.__hash__()
@dataclass @dataclass
class TrCallbackInstance: class TrCallbackInstance:
callback_object: int callback_object: int
timestamp: pd.Timestamp timestamp: float
duration: pd.Timedelta duration: float
intra_process: bool intra_process: bool
_c: TrContext = field(repr=False) _c: TrContext = field(repr=False)
@property @property
def callback_obj(self) -> 'TrCallbackObject': def callback_obj(self) -> Optional['TrCallbackObject']:
return self._c.callback_objects[self.callback_object] return self._c.callback_objects.by_callback_object.get(self.callback_object)
def __hash__(self): def __hash__(self):
return hash((self.callback_object, self.timestamp, self.duration)) return hash((self.callback_object, self.timestamp, self.duration))
def __eq__(self, other):
return self.__hash__() == other.__hash__()
@dataclass @dataclass
class TrCallbackSymbol: class TrCallbackSymbol:
@ -310,13 +346,16 @@ class TrCallbackSymbol:
symbol: str symbol: str
_c: TrContext = field(repr=False) _c: TrContext = field(repr=False)
@cached_property @property
def callback_objs(self) -> List['TrCallbackObject']: def callback_obj(self) -> Optional['TrCallbackObject']:
return list(filter(lambda cb_obj: cb_obj.callback_object == self.id, self._c.callback_objects.values())) return self._c.callback_objects.by_callback_object.get(self.id)
def __hash__(self): def __hash__(self):
return hash((self.id, self.timestamp, self.symbol)) return hash((self.id, self.timestamp, self.symbol))
def __eq__(self, other):
return self.__hash__() == other.__hash__()
####################################### #######################################
# Self-defined (not from ROS2DataModel) # Self-defined (not from ROS2DataModel)
@ -326,14 +365,18 @@ class TrCallbackSymbol:
class TrTopic: class TrTopic:
name: str name: str
_c: TrContext = field(repr=False) _c: TrContext = field(repr=False)
timestamp: int = 0
@cached_property @property
def publishers(self) -> List['TrPublisher']: def publishers(self) -> List['TrPublisher']:
return list(filter(lambda pub: pub.topic_name == self.name, self._c.publishers.values())) return self._c.publishers.by_topic_name.get(self.name) or []
@cached_property @property
def subscriptions(self) -> List['TrSubscription']: def subscriptions(self) -> List['TrSubscription']:
return list(filter(lambda sub: sub.topic_name == self.name, self._c.subscriptions.values())) return self._c.subscriptions.by_topic_name.get(self.name) or []
def __hash__(self): def __hash__(self):
return hash(self.name) return hash(self.name)
def __eq__(self, other):
return self.__hash__() == other.__hash__()

View file

@ -8,27 +8,18 @@ def row_to_type(row, type, **type_kwargs):
return type(**row, **type_kwargs) return type(**row, **type_kwargs)
def df_to_type_list(df, type, **type_kwargs): def df_to_type_list(df, type, mappers=None, **type_kwargs):
if mappers is not None:
for col, mapper in mappers.items():
df[col] = df[col].map(mapper)
has_idx = not isinstance(df.index, pd.RangeIndex) has_idx = not isinstance(df.index, pd.RangeIndex)
ret_list = [] ret_list = []
p = tqdm(desc=" ├─ Processing", total=len(df)) i=0
for row in df.itertuples(index=has_idx): for row in tqdm(df.itertuples(index=has_idx), desc=f" ├─ Processing {type.__name__}s", total=len(df)):
p.update()
row_dict = row._asdict() row_dict = row._asdict()
if has_idx: if has_idx:
row_dict["id"] = row.Index row_dict["id"] = row.Index
del row_dict["Index"] del row_dict["Index"]
ret_list.append(row_to_type(row_dict, type, **type_kwargs)) ret_list.append(row_to_type(row_dict, type, **type_kwargs))
return ret_list return ret_list
def by_index(df, index, type):
return df_to_type_list(df.loc[index], type)
def by_column(df, column_name, column_val, type):
return df_to_type_list(df[df[column_name] == column_val], type)
def list_to_dict(ls, key='id'):
return {getattr(item, key): item for item in ls}