In [1]:
import os
import sys
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt

from IPython.display import display, clear_output

sys.path.append("../autoware/build/tracetools_read/")
sys.path.append("../autoware/build/tracetools_analysis/")
from tracetools_read.trace import *
from tracetools_analysis.loading import load_file
from tracetools_analysis.processor.ros2 import Ros2Handler
from tracetools_analysis.utils.ros2 import Ros2DataModelUtil

from dataclasses import dataclass
from typing import List, Dict, Set, Union, Tuple
from functools import cached_property
import pickle
import re

from utils import ProgressPrinter

In [2]:
def pkl_filename_from_file_timestamp(file_path):
    if os.path.exists(file_path):
        timestamp = os.path.getmtime(file_path)
        pkl_filename = f"ros_objects_{hash(timestamp)}.pkl"
        return pkl_filename, os.path.exists(pkl_filename)
    return None, False

path = os.path.expanduser("~/Downloads/autoware-trace/ust")
path_converted = os.path.join(path, 'converted')
pkl_filename, pkl_exists = pkl_filename_from_file_timestamp(path_converted)

if not pkl_exists:
    file = load_file(path)
    handler = Ros2Handler.process(file)
    util = Ros2DataModelUtil(handler)
    pkl_filename, pkl_exists = pkl_filename_from_file_timestamp(path_converted)

In [3]:
if False:
    n=3
    self = handler.data
    print('====================ROS 2 DATA MODEL===================')
    print('██ Contexts: ██')
    print(self.contexts[:n].to_string())
    print('██ Nodes: ██')
    print(self.nodes[:n].to_string())
    print('██ Publishers (rmw): ██')
    print(self.rmw_publishers[:n].to_string())
    print('██ Publishers (rcl): ██')
    print(self.rcl_publishers[:n].to_string())
    print('██ Subscriptions (rmw): ██')
    print(self.rmw_subscriptions[:n].to_string())
    print('██ Subscriptions (rcl): ██')
    print(self.rcl_subscriptions[:n].to_string())
    print('██ Subscription objects: ██')
    print(self.subscription_objects[:n].to_string())
    print('██ Services: ██')
    print(self.services[:n].to_string())
    print('██ Clients: ██')
    print(self.clients[:n].to_string())
    print('██ Timers: ██')
    print(self.timers[:n].to_string())
    print('██ Timer-node links: ██')
    print(self.timer_node_links[:n].to_string())
    print('██ Callback objects: ██')
    print(self.callback_objects[:n].to_string())
    print('██ Callback symbols: ██')
    print(self.callback_symbols[:n].to_string())
    print('██ Callback instances: ██')
    print(self.callback_instances[:n].to_string())
    print('██ Publish instances (rclcpp): ██')
    print(self.rclcpp_publish_instances[:n].to_string())
    print('██ Publish instances (rcl): ██')
    print(self.rcl_publish_instances[:n].to_string())
    print('██ Publish instances (rmw): ██')
    print(self.rmw_publish_instances[:n].to_string())
    print('██ Take instances (rmw): ██')
    print(self.rmw_take_instances[:n].to_string())
    print('██ Take instances (rcl): ██')
    print(self.rcl_take_instances[:n].to_string())
    print('██ Take instances (rclcpp): ██')
    print(self.rclcpp_take_instances[:n].to_string())
    print('██ Lifecycle state machines: ██')
    print(self.lifecycle_state_machines[:n].to_string())
    print('██ Lifecycle transitions: ██')
    print(self.lifecycle_transitions[:n].to_string())
    print('==================================================')

# Data Structures

In [4]:
def str_to_cls(classname):
    return getattr(sys.modules[__name__], classname)

def row_to_type(row, type, has_idx):
    return type(id=row.name, **row) if has_idx else type(**row)

def df_to_type_list(df, type):
    if isinstance(type, str):
        type = str_to_cls(type)
        
    has_idx = not isinstance(df.index, pd.RangeIndex)
    return [row_to_type(row, type, has_idx) for _, row in df.iterrows()]

def by_index(df, index, type):
    return df_to_type_list(df.loc[index], type)

def by_column(df, column_name, column_val, type):
    return df_to_type_list(df[df[column_name] == column_val], type)

def list_to_dict(ls, key='id'):
    return {getattr(item, key): item for item in ls}

#################################
# Predefined (from ROS2DataModel)
#################################

@dataclass
class Node:
    id: int
    timestamp: int
    tid: int
    rmw_handle: int
    name: str
    namespace: str

    @cached_property
    def path(self) -> str:
        return '/'.join((self.namespace, self.name))

    @cached_property
    def publishers(self) -> List['Publisher']:
        return list(filter(lambda pub: pub.node_handle == self.id, publishers.values()))

    @cached_property
    def subscriptions(self) -> List['Subscription']:
        return list(filter(lambda sub: sub.node_handle == self.id, subscriptions.values()))
    
    @cached_property
    def timers(self) -> List['Timer']:
        links = [link.id for link in timer_node_links.values() if link.node_handle == self.id]
        return list(filter(lambda timer: timer.id in links, timers.values()))

    def __hash__(self):
        return hash(self.id)

@dataclass
class Publisher:
    id: int
    timestamp: int
    node_handle: int
    rmw_handle: int
    topic_name: str
    depth: int

    @property
    def node(self) -> 'Node':
        return nodes[self.node_handle]

    @cached_property
    def subscriptions(self) -> List['Subscription']:
        return list(filter(lambda sub: sub.topic_name == self.topic_name, subscriptions.values()))

    @cached_property
    def instances(self) -> List['PublishInstance']:
        return list(filter(lambda inst: inst.publisher_handle == self.id, publish_instances))
    
    @property
    def topic(self) -> 'Topic':
        return topics[self.topic_name]

    def __hash__(self):
        return hash(self.id)


@dataclass
class Subscription:
    id: int
    timestamp: int
    node_handle: int
    rmw_handle: int
    topic_name: str
    depth: int

    @property
    def node(self) -> 'Node':
        return nodes[self.node_handle]

    @cached_property
    def publishers(self) -> List['Publisher']:
        return list(filter(lambda pub: pub.topic_name == self.topic_name, publishers.values()))
    
    @cached_property
    def subscription_objects(self) -> 'SubscriptionObject':
        return list(filter(lambda sub_obj: sub_obj.subscription_handle == self.id, subscription_objects.values()))

    @property
    def topic(self) -> 'Topic':
        return topics[self.topic_name]

    def __hash__(self):
        return hash(self.id)
    
@dataclass
class Timer:
    id: int
    timestamp: int
    period: int
    tid: int

    @cached_property
    def nodes(self) -> List['Node']:
        links = [link.node_handle for link in timer_node_links.values() if link.id == self.id]
        return list(filter(lambda node: node.id in links, nodes.values()))
    
    @property
    def callback_object(self) -> 'CallbackObject':
        return callback_objects[self.id]

    def __hash__(self):
        return hash(self.id)

@dataclass
class TimerNodeLink:
    id: int
    timestamp: int
    node_handle: int

@dataclass
class SubscriptionObject:
    id: int             # subscription
    timestamp: int
    subscription_handle: int

    @property
    def subscription(self) -> 'Subscription':
        return subscriptions[self.subscription_handle]

    @property
    def callback_object(self) -> 'CallbackObject':
        return callback_objects[self.id]

    def __hash__(self):
        return hash((self.id, self.timestamp, self.subscription_handle))

@dataclass
class CallbackObject:
    id: int             # (reference) = subscription_object.id | timer.id | ....
    timestamp: int
    callback_object: int

    @cached_property
    def callback_instances(self) -> List['CallbackInstance']:
        return list(filter(lambda inst: inst.callback_object == self.callback_object, callback_instances))

    @cached_property
    def owner(self):
        if self.id in timers:
            return timers[self.id]
        if self.id in publishers:
            return publishers[self.id]
        if self.id in subscription_objects:
            return subscription_objects[self.id]
        if self.id in handler.data.services.index:
            return 'Service'
        if self.id in handler.data.clients.index:
            return 'Client'
        return None

    @cached_property
    def owner_info(self):
        info = util.get_callback_owner_info(self.callback_object)
        if info is None:
            return None, None
        
        type_name, dict_str = info.split(" -- ")
        kv_strs = dict_str.split(", ")
        info_dict = {k: v for k, v in map(lambda kv_str: kv_str.split(": ", maxsplit=1), kv_strs)}
        return type_name, info_dict

    def __hash__(self):
        return hash((self.id, self.timestamp, self.callback_object))

@dataclass
class PublishInstance:
    publisher_handle: int
    timestamp: int
    message: int

    @property
    def publisher(self) -> 'Publisher':
        return publishers[self.publisher_handle]

    def __hash__(self):
        return hash((self.publisher_handle, self.timestamp, self.message))

@dataclass
class CallbackInstance:
    callback_object: int
    timestamp: pd.Timestamp
    duration: pd.Timedelta
    intra_process: bool

    @property
    def callback_obj(self) -> 'CallbackObject':
        return callback_objects[self.callback_object]

    def __hash__(self):
        return hash((self.callback_object, self.timestamp, self.duration))

@dataclass
class CallbackSymbol:
    id: int  # callback_object
    timestamp: int
    symbol: str

    @cached_property
    def callback_objs(self) -> List['CallbackObject']:
        return list(filter(lambda cb_obj: cb_obj.callback_object == self.id, callback_objects.values()))

    def __hash__(self):
        return hash((self.id, self.timestamp, self.symbol))


#######################################
# Self-defined (not from ROS2DataModel)
#######################################

@dataclass
class Topic:
    name: str

    @cached_property
    def publishers(self) -> List['Publisher']:
        return list(filter(lambda pub: pub.topic_name == self.name, publishers.values()))
    
    @cached_property
    def subscriptions(self) -> List['Subscription']:
        return list(filter(lambda sub: sub.topic_name == self.name, subscriptions.values()))

    def __hash__(self):
        return hash(self.name)


In [5]:
if not pkl_exists:
    print("Did not find pickled ROS objects, extracting...")
    #######################################
    # Instantiate collections
    #######################################

    nodes:                  Dict[int, 'Node']               = list_to_dict(df_to_type_list(handler.data.nodes,                  'Node'));               print(f"Processed {len(nodes):<8d} nodes")
    publishers:             Dict[int, 'Publisher']          = list_to_dict(df_to_type_list(handler.data.rcl_publishers,         'Publisher'));          print(f"Processed {len(publishers):<8d} publishers")
    subscriptions:          Dict[int, 'Subscription']       = list_to_dict(df_to_type_list(handler.data.rcl_subscriptions,      'Subscription'));       print(f"Processed {len(subscriptions):<8d} subscriptions")
    timers:                 Dict[int, 'Timer']              = list_to_dict(df_to_type_list(handler.data.timers,                 'Timer'));              print(f"Processed {len(timers):<8d} timers")
    timer_node_links:       Dict[int, 'TimerNodeLink']      = list_to_dict(df_to_type_list(handler.data.timer_node_links,       'TimerNodeLink'));      print(f"Processed {len(timer_node_links):<8d} timer-node links")
    subscription_objects:   Dict[int, 'SubscriptionObject'] = list_to_dict(df_to_type_list(handler.data.subscription_objects,   'SubscriptionObject')); print(f"Processed {len(subscription_objects):<8d} subscription objects")
    callback_objects:       Dict[int, 'CallbackObject']     = list_to_dict(df_to_type_list(handler.data.callback_objects,       'CallbackObject'));     print(f"Processed {len(callback_objects):<8d} callback objects")
    callback_symbols:       Dict[int, 'CallbackSymbol']     = list_to_dict(df_to_type_list(handler.data.callback_symbols,       'CallbackSymbol'));     print(f"Processed {len(callback_symbols):<8d} callback symbols")
    publish_instances:      List['PublishInstance']         =              df_to_type_list(handler.data.rcl_publish_instances,  'PublishInstance');     print(f"Processed {len(publish_instances):<8d} publish instances")
    callback_instances:     List['CallbackInstance']        =              df_to_type_list(handler.data.callback_instances,     'CallbackInstance');    print(f"Processed {len(callback_instances):<8d} callback instances")

    _unique_topic_names = {*(pub.topic_name for pub in publishers.values()), *(sub.topic_name for sub in subscriptions.values())}
    topics: Dict[str, 'Topic'] = list_to_dict(map(Topic, _unique_topic_names), key="name"); print(f"Processed {len(topics):<8d} topics")

    print("Caching dynamic properties...")

    [(o.path, o.publishers, o.subscriptions, o.timers) for o in nodes.values()]         ; print("Cached node properties")
    [(o.instances, o.subscriptions) for o in publishers.values()]                       ; print("Cached publisher properties")
    [(o.publishers, o.subscription_objects) for o in subscriptions.values()]             ; print("Cached subscription properties")
    [(o.nodes) for o in timers.values()]                                                ; print("Cached timer properties")
    [(o.callback_instances, o.owner, o.owner_info) for o in callback_objects.values()]  ; print("Cached callback object properties")
    [(o.callback_objs) for o in callback_symbols.values()]                               ; print("Cached callback symbol properties")
    [(o.publishers, o.subscriptions) for o in topics.values()]                          ; print("Cached topic properties")

    fields_to_pickle = [
        "nodes",
        "publishers",
        "subscriptions",
        "timers",
        "timer_node_links",
        "subscription_objects",
        "callback_objects",
        "callback_symbols",
        "publish_instances",
        "callback_instances",
        "topics"
    ]

    pkl_dict = {key: globals()[key] for key in fields_to_pickle}

    print("Pickling...")
    with open(pkl_filename, "wb") as f:
        pickle.dump(pkl_dict, f)
else:
    print("Found pickled ROS objects from previous session, restoring...")
    with open(pkl_filename, "rb") as f:
        pkl_dict = pickle.load(f)
        for k, v in pkl_dict.items():
            globals()[k] = v

print("Done.")


Found pickled ROS objects from previous session, restoring...
Done.


# Callback-Sub & Callback-Timer Links

In [None]:
import re

sym_table = []

for sym in callback_symbols.values():
    try:
        cbo = list(filter(lambda val: val.callback_object==sym.id, callback_objects.values()))
        assert len(cbo) == 1
        cbo = cbo[0]
    except:
        print(len(cbo))
        continue
    owner_info = cbo.owner_info

    if None in owner_info: continue
    type, info = owner_info
    sym_table.append((sym, type, info))

sym_table.sort(key=lambda tup: tup[1])

def trim(string, length):
    if len(string) > length:
        return f"{string[:length-3]}..."
    return string

for sym, type, info in sym_table:
    sym: CallbackSymbol
    pretty_sym = Ros2DataModelUtil._prettify(None, sym.symbol)
    pretty_sym = re.sub(r"std::shared_ptr<(.*?) *(const)?>", r"\1*", pretty_sym)
    try:
        i = len(sym.callback_obj.callback_instances)
    except KeyError:
        i = -1
    print(f"{trim(pretty_sym, 100):100s}: i={i:>4d} {type:12s} n={info['node']:40s}", end=' ') 
    if type == 'Timer':
        print(f"p={info['period']:7s}")
    elif type == 'Subscription':
        print(f"t={info['topic']:30s}")
    else:
        print()

# Topic-Node Mapping

In [None]:
# Aggregate topics that have the same pubs and subs
topic_cohorts = {}
for topic in topics.values():
    key = (frozenset({*(pub.node_handle for pub in topic.publishers)}), frozenset({*(sub.node_handle for sub in topic.subscriptions)}))
    if key not in topic_cohorts:
        topic_cohorts[key] = []
    topic_cohorts[key].append(topic)

print(f"{len(topics)} topics were aggregated into {len(topic_cohorts)} cohorts")

# Timer-Node Mapping

In [None]:
unknowns = {}

print_node_timer = lambda node_path, period: print(f"{node_path:<90s}: {1/(period*1e-9):8.2f}Hz")

for timer in timers.values():
    timer_nodes = timer.nodes
    if not timer_nodes:
        if timer.period not in unknowns:
            unknowns[timer.period] = 0
        unknowns[timer.period] += 1

    for node in timer_nodes: print_node_timer(node.path, timer.period)
        
for period, count in unknowns.items():
    print_node_timer(f"UNKNOWN (x{count})", period)

n_unknown = sum(unknowns.values())  # Values are counts per period
print(f"Found {len(timers) - n_unknown} timers with a recorded node, {n_unknown} without.")

# Measure Frequency Deviations

In [None]:
# Get Publisher frequencies
df_publications = handler.data.rcl_publish_instances
pub_stats = {}
unknown = 0
for pi in publish_instances:
    try:
        pub = pi.publisher
    except KeyError:
        unknown += 1
        continue
    if pub.id not in pub_stats:
        pub_stats[pub.id] = {'times': []}
    pub_stats[pub.id]['times'].append(pi.timestamp*1e-9) # Nanoseconds to seconds float

print(f"{unknown} unknown publisher handles ({len(pub_stats)} known ones)")

# Plot Frequency Deviations

In [None]:
fig_dirname = "fig_frequency"
os.makedirs(fig_dirname, exist_ok=True)
for i, (k, v) in enumerate(sorted(pub_stats.items(), key=lambda kv: len(kv[1]['times']), reverse=True)):
    pub_time_diff = np.diff(np.array(v['times']))
    v['period'] = pub_time_diff.mean()
    v['period_std'] = pub_time_diff.std()
    v['frequency'] = 1 / v['period']
    v['frequency_std'] = (1/pub_time_diff).std()

    try:
        publisher = publishers[k]
        publisher_node = publisher.node
        topic_name = publisher.topic_name
        node_path = publisher_node.path
    except Exception:
        topic_name="UNKNOWN"
        node_path="UNKNOWN"
    
    fig = plt.figure(figsize=(15,5))
    ax = fig.add_subplot()
    ax.hist(1/pub_time_diff)
    ax.set_xlabel("Publication Frequency [Hz]")
    ax.set_ylabel("#Publications")
    ax.set_title(f"{node_path} =({v['frequency']:.2f}Hz)=> {topic_name}")
    plt.savefig('/'.join((fig_dirname, f"{i:06}{node_path}__{topic_name}".replace('/','-'))))


# Data Flow Graph

In [None]:
node_filters = ["transform_listener_impl", "_monitor"]
topic_filters = ["/rosout", "/parameter_events", "/diagnostics"]

from pyvis.network import Network
net = Network(notebook=True, height='750px', width='100%', bgcolor='#ffffff', font_color='#000000')

net.add_node("INPUT", label="Input", size=100, color="green", physics=False, x=0, y=0)
net.add_node("OUTPUT", label="Output", size=100, color="red", physics=False, x=6000, y=0)


for node in nodes.values():
    if any(f in node.path for f in node_filters): 
        continue
    net.add_node(node.id, label=node.name, title=node.path, size=20, color="#333")

for cohort_key, cohort_topics in topic_cohorts.items():
    cohort_topic_names = [topic.name for topic in cohort_topics if not any(f in topic.name for f in topic_filters)]
    if not cohort_topic_names: 
        continue
    cohort_id="\n".join(cohort_topic_names)
    cohort_weight=len(cohort_topic_names)
    net.add_node(cohort_id, label=" ", title=cohort_id, size=5, color="#333")
    
    pubs = cohort_key[0]
    subs = cohort_key[1]
    n_pubs = len(pubs)
    n_subs = len(subs)
    
    try:
        if not n_pubs:
            net.add_edge("INPUT", cohort_id, arrows="to", color="green", weight=cohort_weight)
        if not n_subs:
            net.add_edge(cohort_id, "OUTPUT", arrows="to", color="red", weight=cohort_weight)

        for pub in pubs:
            net.add_edge(pub, cohort_id, arrows="to", color="green", weight=cohort_weight)
        for sub in subs:
            net.add_edge(cohort_id, sub, arrows="to", color="red", weight=cohort_weight)
    except:
        continue

net.toggle_physics(True)
net.show_buttons()
net.show("graph.html")

# Pub-Use Latencies
Compute for each node and its data dependencies the list of pub-use delays (per-topic-per-node list of pub-use delays)

In [None]:
def filter_none(ls):
    return filter(lambda x: x is not None, ls)

def safe_map(func, ls):
    def safe_func(arg):
        try:
            return func(arg)
        except:
            return None
    
    return map(safe_func, ls)

pub_use_delays = {node.id: {
    'pubs': {}, 
    'invocations': {}, 
    'n_unknown_invocations': 0, 
    'n_pub_timestamps': 0
    } for node in nodes.values()}

for node in nodes.values():
    node_pub_use_dict = pub_use_delays[node.id]
    timestamp_min = np.inf; timestamp_max = 0

    n_pub_timestamps = 0
    for sub in node.subscriptions:
        node_pub_use_dict['pubs'][sub.topic_name] = {}
        for pub in sub.publishers:
            pub_timestamps = [inst.timestamp for inst in pub.instances]

            try:
                pub_t_min = min(pub_timestamps); pub_t_max = max(pub_timestamps)
            except ValueError:
                pub_t_min = np.inf; pub_t_max = 0
            
            if pub_t_min < timestamp_min: timestamp_min = pub_t_min
            if pub_t_max > timestamp_max: timestamp_max = pub_t_max

            node_pub_use_dict['pubs'][sub.topic_name][pub.node.path] = pub_timestamps
            node_pub_use_dict['n_pub_timestamps'] += len(pub_timestamps)

    timer_cb_objs = list(filter_none(safe_map(lambda timer: timer.callback_object, node.timers)))
    subsc_cb_objs = list(filter_none(safe_map(lambda subsc: subsc.subscription_object.callback_object, node.subscriptions)))

    print(f"{node.path:95s} has {len(timer_cb_objs):1d} timer callbacks, {len(subsc_cb_objs):2d} subscription callbacks, {len(node_pub_use_dict['pubs']):2d} subscribed topics.")

    node_invocations = node_pub_use_dict['invocations']

    for cb_obj in timer_cb_objs + subsc_cb_objs:
        cb_invocations = []
        for inst in cb_obj.callback_instances:
            cb_invocations.append((inst.timestamp, inst.duration))

        node_invocations[cb_obj.id] = cb_invocations

In [None]:
from matplotlib import cm

fig_dirname = "fig_pub_use"
os.makedirs(fig_dirname, exist_ok=True)
plt.close('all')

node_filters=[]#"transform_listener_impl",]

nodes_filtered = [node for node in nodes.values() if not any(f in node.path for f in node_filters)]
print(f"Ignoring {len(nodes.values()) - len(nodes_filtered)} nodes due to filters.")

common_offset = min(map(lambda cb_inst: cb_inst.timestamp.timestamp(), callback_instances))

zero_color = cm.get_cmap('viridis')(0.0)

for node_i, (node, node_path, node_pub_use_dict) in enumerate(map(lambda node: (node, node.path, pub_use_delays[node.id]), nodes_filtered)):

    if not node_pub_use_dict['invocations']:
        print(f"{node_path:95s} has no invocations, skipping.")
        continue

    if len(node_pub_use_dict['pubs']) == 0:
        print(f"Skipping {node_path}, no publications")
        continue

    fig = plt.figure(figsize=(15,5))
    ax: plt.Axes = fig.add_subplot()

    max_pubs_per_topic = max(len(pubs) for pubs in node_pub_use_dict['pubs'].values())
    topic_names, topic_pubs = (zip(*node_pub_use_dict['pubs'].items()))

    vmin = 0; vmax = max_pubs_per_topic

    y_labels = []
    current_y = 0

    for invoc_i, (cb_obj_id, cb_invocations) in enumerate(node_pub_use_dict['invocations'].items()):
        try:
            cb_obj = callback_objects[cb_obj_id]
            sym = callback_symbols[cb_obj.callback_object].symbol
            sym = Ros2DataModelUtil._prettify(None, sym)
            sym = re.sub(r"std::shared_ptr<(.*?)>", r"\1*", sym)

            cb_owner = cb_obj.owner
            if isinstance(cb_owner, Timer):
                cb_type = "T"
            elif isinstance(cb_owner, SubscriptionObject):
                cb_type = "S"
        except KeyError or AttributeError:
            sym = "UNKNOWN"
            cb_type = "U"
        
        y_labels.append(f"{sym} {cb_type}")
        n_markers = len(cb_invocations)

        points_x = []; points_y = []
        for time, dur in cb_invocations:
            time = time.timestamp() - common_offset; dur = dur.total_seconds()
            points_x += [time, time+dur, None]
            points_y += [current_y, current_y, 0.0]
        
        ax.plot(points_x,points_y, marker='.', c=zero_color)
        current_y += 1

    n_cbs = current_y

    for topic_i, (topic_name, pubs) in enumerate(zip(topic_names, topic_pubs)):
        for pub_i, (pub_name, timestamps) in enumerate(pubs.items()):
            n_markers = len(timestamps)
            ax.scatter(np.array(timestamps)*1e-9 - common_offset, (current_y,) * n_markers, marker='.', c=(pub_i,) * n_markers, vmin=vmin, vmax=vmax)
        
        y_labels.append(topic_name)
        current_y += 1
    
    trigger_strs = []
    t = node.timers
    if t:
        n_timers = len(t)
        freqs = map(lambda timer: 1 / (timer.period*1e-9), t)
        trigger_strs.append(f"{n_timers} timer{'s' if n_timers != 1 else ''}, {'Hz, '.join((f'{freq:.0f}' for freq in freqs))}Hz")
    if node.subscriptions:
        n_subs = len(node.subscriptions)
        trigger_strs.append(f"{n_subs} subscription{'s' if n_subs != 1 else ''}")

    ax.set_xlabel("Publication / Invocation Timestamp [s]")
    ax.set_ylabel("Topic")
    ax.set_yticks(range(current_y))
    ax.set_yticklabels(y_labels)
    ax.set_ylim(0 - .1, current_y - 1 + .1)
    ax.set_title(f"{node_path} ({'; '.join(trigger_strs)})")
    ax.set_xlim(50, 50.25)

    ax.hlines(n_cbs - 0.5, *ax.get_xlim(), linestyles='dashed')
    plt.savefig(os.path.join(fig_dirname, f"{node_i:06}{node_path}".replace('/','-')))

# E2E Latency Calculation

In [6]:
#################################################
# Data structures & helpers
#################################################

LatencyStats = pd.Series

@dataclass
class LatencyGraph:
    verts: Set[CallbackObject]
    edges: Dict[Tuple[CallbackObject, CallbackObject], Tuple[Topic, LatencyStats]]
    starts: Dict[CallbackObject, Topic]
    ends: Dict[CallbackObject, Topic]

def pub_use_latencies(cb_instances: List[CallbackInstance], pub_instances: List[PublishInstance]):
    cb_times = sorted([inst.timestamp.timestamp() for inst in cb_instances])

    if not pub_instances:
        return pd.Series(np.full(len(cb_instances), np.nan), index=cb_times)

    pub_times = np.array(sorted([pub.timestamp * 1e-9 for pub in pub_instances]))

    pub_use_lats = np.array([cb_time - np.max(pub_times[pub_times < cb_time], initial=-np.inf) for cb_time in cb_times])
    pub_use_lats[np.isposinf(pub_use_lats)] = np.nan
    ret_series = pd.Series(pub_use_lats, index=cb_times)
    return ret_series

def inst_runtime_interval(cb_inst):
    inst_t_min = cb_inst.timestamp.timestamp()
    inst_t_max = inst_t_min + cb_inst.duration.total_seconds()
    return (inst_t_min, inst_t_max)

def count_pub_insts_in_intervals(cb_intervals: List[Tuple[float, float]], pub_insts: List[PublishInstance]):
    """
    Counts number of publication instancess that lie within one of the cb_intervals.
    """
    pub_timestamps = [inst.timestamp * 1e-9 for inst in pub_insts]
    
    # Algorithm: Two-pointer method
    # With both the pub_timestamps and cb_intervals sorted ascending,
    # we can cut down the O(m*n) comparisons to O(m+n).
    pub_timestamps.sort()
    cb_intervals.sort(key=lambda tup: tup[0])

    n_overlaps = 0
    cb_iter = iter(cb_intervals)
    pub_iter = iter(pub_timestamps)
    (t_min, t_max) = next(cb_iter, (None, None))
    t_pub = next(pub_iter, None)

    while t_pub is not None and t_min is not None:
        if t_min <= t_pub <= t_max:  # If publication in interval, increase counter, go to next pub (multiple pubs can be within one interval)
            n_overlaps += 1
            t_pub = next(pub_iter, None)
        elif t_pub < t_min:  # If publication before interval, increase pub
            t_pub = next(pub_iter, None)
        else:  # If interval before publication, increase interval
            (t_min, t_max) = next(cb_iter, (None, None))

    return n_overlaps

#################################################
# Identify input and output topics
#################################################

in_topics = [t for t in topics.values() if not t.publishers]
out_topics = [t for t in topics.values() if not t.subscriptions]

#################################################
# For each node, work out dependencies and
# publications of each callback
#################################################

cb_to_scored_pub: Dict[CallbackObject, Set[Tuple[Publisher, float]]] = {}
topic_to_dep_cb: Dict[Topic, Set[CallbackObject]] = {}
pub_cb_to_lat_stats: Dict[Tuple[Publisher, CallbackObject], LatencyStats] = {}

with ProgressPrinter("Processing", len(callback_objects)) as p:
    for cb in callback_objects.values():
        cb_sym = callback_symbols[cb.callback_object].symbol if cb.callback_object in callback_symbols else None
        if cb_sym:
            p.step(Ros2DataModelUtil._prettify(None, cb_sym))
        else:
            p.step(cb.id)

        if "ParameterEvent" in cb_sym:
            continue

        # Find topics the callback EXPLICITLY depends on
        # - Timer callbacks: no EXPLICIT dependencies
        # - Subscription callbacks: callback depends on the subscribed topic. Possibly also has other IMPLICIT dependencies

        if type(cb.owner).__name__ == SubscriptionObject.__name__:
            owner_node = cb.owner.subscription.node
            dep_topics = [cb.owner.subscription.topic,]
        elif type(cb.owner).__name__ == Timer.__name__:
            owner_nodes = cb.owner.nodes
            if len(owner_nodes) != 1:
                raise(ValueError("Timer has more than one owner!"))
            dep_topics = []
        elif cb.owner is None:
            dep_topics = []
            continue
        else:
            raise RuntimeError(f"Callback owners other than timers/subscriptions cannot be handled: {cb.owner} {cb.owner_info}")

        for topic in dep_topics: 
            if topic not in topic_to_dep_cb:
                topic_to_dep_cb[topic] = set()
            topic_to_dep_cb[topic].add(cb)

            for pub in topic.publishers:
                pub_cb_to_lat_stats[(pub, cb)] = pub_use_latencies(cb.callback_instances, pub.instances)

        # Find topics the callback publishes to (HEURISTICALLY!)
        # For topics published to during the runtime of the callback's instances, 
        # assume that they are published by the callback

        cb_runtime_intervals = [inst_runtime_interval(inst) for inst in cb.callback_instances]
        cb_pub_overlap_counts = [count_pub_insts_in_intervals(cb_runtime_intervals, pub.instances) for pub in owner_node.publishers]

        for pub, olap_count in zip(owner_node.publishers, cb_pub_overlap_counts):
            if olap_count == 0 or not pub.instances:
                continue
            score = olap_count / len(pub.instances)

            if cb not in cb_to_scored_pub: 
                cb_to_scored_pub[cb] = set()
            cb_to_scored_pub[cb].add((pub, score))


(1842/1842) Processing done.                                                                                                                   


In [19]:
pub_to_scored_cb = {}

verts = set(callback_objects.values())
edges = {}
for send_cb, scored_pubs in cb_to_scored_pub.items():
    for pub, score in scored_pubs:
        if score == 0.0:
            continue
        if pub not in pub_to_scored_cb:
            pub_to_scored_cb[pub] = []
        pub_to_scored_cb[pub].append((send_cb, score))
        receiver_cbs = [sub_obj.callback_object for sub in pub.subscriptions for sub_obj in sub.subscription_objects]
        for recv_cb in receiver_cbs:
            edges[(send_cb, recv_cb)] = (pub.topic, pub_cb_to_lat_stats[(pub, recv_cb)])

for pub, scored_cbs in pub_to_scored_cb.items():
    if len(scored_cbs) > 1:
        def _mapfun(tup):
            cb, score = tup
            pretty_sym = Ros2DataModelUtil._prettify(None, callback_symbols[cb.callback_object].symbol)
            pretty_sym = re.sub(r"std::shared_ptr<(.*?) *(const)?>", r"\1*", pretty_sym)
            return f'{score*100:>10.6f}% {pretty_sym}'
        cbstr = ',\n  '.join(map(_mapfun, scored_cbs))
        print(f"{pub.topic_name}:\n  {cbstr}")

inputs = {}
outputs = {}
for topic in out_topics:
    outputs.update({cb: topic for pub in topic.publishers if pub in pub_to_scored_cb for cb, score in pub_to_scored_cb[pub]})
for topic in in_topics:
    inputs.update({sub_obj.callback_object: topic for sub in topic.subscriptions for sub_obj in sub.subscription_objects})


#################################################
# Filter callback objects and topics
#################################################

callback_symbol_filters = [
    "rcl_interfaces::msg::ParameterEvent", "diagnostic_updater::Updater", 
    "rclcpp::ParameterService::ParameterService", "tf2_ros::TransformListener",
    "rclcpp_components::ComponentManager", "diagnostic_aggregator::Aggregator"
    ]

verts = set(filter(lambda vert: not any(f in callback_symbols[vert.callback_object].symbol for f in callback_symbol_filters), verts))
edges = {(cb1, cb2): val for (cb1, cb2), val in edges.items() if cb1 in verts and cb2 in verts}
outputs = {cb: topic for cb, topic in outputs.items() if cb in verts}
inputs = {cb: topic for cb, topic in inputs.items() if cb in verts}

latency_graph = LatencyGraph(verts, edges, inputs, outputs)

/diagnostics:
   93.922652% void (diagnostic_updater::Updater::?)(),
    6.629834% void (ProcessMonitor::?)()
/diagnostics:
    0.202429% void (rclcpp::TimeSource::?)(rosgraph_msgs::msg::Clock*),
   60.526316% void (planning_diagnostics::PlanningErrorMonitorNode::?)()
/awapi/vehicle/get/status:
    0.689655% void (rclcpp::TimeSource::?)(rosgraph_msgs::msg::Clock*),
  100.000000% void (autoware_api::AutowareIvAdapter::?)()
/diagnostics:
    0.005231% void (rclcpp::TimeSource::?)(rosgraph_msgs::msg::Clock*),
    0.005231% void (NDTScanMatcher::?)(geometry_msgs::msg::PoseWithCovarianceStamped*),
    0.428960% void (NDTScanMatcher::?)(sensor_msgs::msg::PointCloud2*),
    4.723792% void (NDTScanMatcher::?)(sensor_msgs::msg::PointCloud2*),
    0.136012% void (AutowareStateMonitorNode::?)()
/localization/pose_estimator/pose_with_covariance:
    0.384615% void (rclcpp::TimeSource::?)(rosgraph_msgs::msg::Clock*),
  100.000000% void (NDTScanMatcher::?)(sensor_msgs::msg::PointCloud2*)
/localizati

In [20]:
#################################################
# Get intra-node dependencies from settings
#################################################

def _find_node(path):
    return next(filter(lambda n: n.path == path, nodes.values()))

from ruamel.yaml import YAML

yaml = YAML()
with open("settings/intra-node-data-deps.yaml", "r") as f:
    node_internal_deps = yaml.load(f)
    # Convert node path to node instance
    node_internal_deps = {_find_node(path): {
            callback_objects[cb_id]: [callback_objects[dep_id] for dep_id in dep_ids]
            for cb_id, dep_ids 
            in deps.items()
        } for path, deps in node_internal_deps.items()}

for node, cb_mappings in node_internal_deps.items():
    print(node)
    for cb, deps in cb_mappings.items():
        print("  ", type(cb).__name__, "->", ' '.join(map(lambda x: type(x).__name__, deps)))

KeyError: 281471221778768

In [None]:

#################################################
# Plot DFG
#################################################

import graphviz as gv

g = gv.Digraph('G', filename="latency_graph.gv", node_attr={'shape': 'record', 'margin': '0.00001', 'width': '0.00001', 'height': '0.001'}, graph_attr={'pack': '1'})
g.graph_attr['rankdir'] = 'LR'

g.node("INPUT", gv.nohtml("{INPUT |<out>}"))
g.node("OUTPUT", gv.nohtml("{<in> |OUTPUT}"))

nodes_to_cbs = {}
export_dict = {}

for vert in latency_graph.verts:
    vert: CallbackObject

    if isinstance(vert.owner, Timer):
        owner_nodes = vert.owner.nodes
    elif isinstance(vert.owner, SubscriptionObject):
        owner_nodes = [vert.owner.subscription.node]
    else:
        owner_nodes = []

    if len(owner_nodes) > 1:
        raise RuntimeError(f"CB has owners {', '.join(map(lambda n: n.path, owner_nodes))}")
    elif not owner_nodes:
        print("[WARN] CB has no owners")
        continue

    owner = owner_nodes[0]
    if not owner in nodes_to_cbs: nodes_to_cbs[owner] = []
    nodes_to_cbs[owner].append(vert)
    if not owner.path in export_dict: export_dict[owner.path] = []

for node, cbs in nodes_to_cbs.items():
    with g.subgraph(name=f"cluster_{node.path}") as c:
        c.attr(label=node.path)
        c.attr(margin='0.0')
        c.attr(bgcolor='lightgray')

        for cb in cbs:
            cb: CallbackObject
            pretty_sym = Ros2DataModelUtil._prettify(None, callback_symbols[cb.callback_object].symbol)
            pretty_sym = re.sub(r"std::shared_ptr<(.*?) *(const)?>", r"\1*", pretty_sym)
            pretty_sym = pretty_sym.replace('{', '\\{').replace('}', '\\}')

            export_dict[node.path].append({cb.id: {
                'symbol': pretty_sym, 
                'ins': sum(map(lambda k: k[1].id == cb.id, latency_graph.edges.keys())) + sum(map(lambda k: k.id == cb.id, latency_graph.starts.keys())), 
                'outs': sum(map(lambda k: k[0].id == cb.id, latency_graph.edges.keys())) + sum(map(lambda k: k.id == cb.id, latency_graph.ends.keys()))
                }})

            cb_durations = np.array(list(map(lambda inst: inst.duration.total_seconds(), cb.callback_instances)))
            if len(cb_durations) == 0:
                cb_durations = np.zeros(1)
            cb_dur_stats = (cb_durations.min(), cb_durations.mean(), cb_durations.max())
            c.node(str(cb.id), gv.nohtml(f"{{<in> |{pretty_sym} |<out>}}"), tooltip=f"{cb_dur_stats[0]*1e6:.0f}µs, {cb_dur_stats[1]*1e6:.0f}µs, {cb_dur_stats[2]*1e6:.0f}µs", fontcolor=('red' if isinstance(cb.owner, Timer) else 'black'))


for (c1, c2), (topic, lat_stats) in latency_graph.edges.items():
    g.edge(f"{c1.id}:out", f"{c2.id}:in", tooltip=f"{topic.name} ({lat_stats.mean()*1000:.2f}ms)")

for c, t in latency_graph.starts.items():
    g.edge("INPUT:out", f"{c.id}:in", tooltip=t.name)

for c, t in latency_graph.ends.items():
    g.edge(f"{c.id}:out", "OUTPUT:in", tooltip=t.name)

for n, deps in node_internal_deps.items():
    for cb, dep_cbs in deps.items():
        for dep_cb in dep_cbs:
            g.edge(f"{dep_cb.id}:out", f"{cb.id}:in", tooltip="node-internal data dependency", color='red')

with open("settings/node-cbs.yaml", "w") as f:
    yaml = YAML()
    yaml.dump(export_dict, f)

g.save("latency_graph.gv")

g

In [10]:

#################################################
# Transitively add latencies to get E2E latency
#################################################

@dataclass
class PathElem:
    src: CallbackObject
    dst: CallbackObject
    topic: Topic
    latencies: LatencyStats

def get_latency_paths(cb1, cb_to_cb_to_lat_stats, goals, parent_path=[]) -> List[List[PathElem]]:
    if cb1 in goals:
        return [parent_path]

    if cb1 not in cb_to_cb_to_lat_stats:
        return [parent_path]

    paths = []
    for cb2 in cb_to_cb_to_lat_stats[cb1]:
        for topic, lats in cb_to_cb_to_lat_stats[cb1][cb2].items():
            new_paths = get_latency_paths(cb2, cb_to_cb_to_lat_stats, goals, parent_path + [PathElem(cb1, cb2, topic, lats)])
            paths += new_paths

    return paths


cb_to_cb_to_lat_stats = {}
for (pub, cb2), lat_stats in pub_cb_to_lat_stats.items():
    if pub not in pub_to_scored_cb:
        #print(f"[WARN] Pub on topic {pub.topic.name} not in pub_to_scored_cb, skipping.")
        print(end='.')
        continue
    if len(pub_to_scored_cb[pub]) > 1:
        #print(f"[WARN] Pub on topic {pub.topic.name} has {len(pub_to_scored_cb[pub])} callbacks.")
        print(end='#')
    for cb1, score in pub_to_scored_cb[pub]:
        if score != 1.0:
            #print(f"[WARN] Callback for topic {pub.topic.name} only has a score of {score*100:.3f}%")
            print(end='*')
        if cb1 in cb_to_cb_to_lat_stats and cb2 in cb_to_cb_to_lat_stats[cb1]:
            #print(f"[WARN] Pair of callbacks already in dict!")
            print(end='_')
        else:
            if cb1 not in cb_to_cb_to_lat_stats:
                cb_to_cb_to_lat_stats[cb1] = {}
            if cb2 not in cb_to_cb_to_lat_stats[cb1]:
                cb_to_cb_to_lat_stats[cb1][cb2] = {}
        cb_to_cb_to_lat_stats[cb1][cb2][pub.topic] = pub_cb_to_lat_stats[(pub, cb2)]

latency_paths = []
for cb in inputs:
    latency_paths += get_latency_paths(cb, cb_to_cb_to_lat_stats, outputs.keys())

def pl(l, lvl=0):
    if isinstance(l, list):
        print("  "*lvl, type(l), len(l))
        for i in l:
            pl(i, lvl+1)
    else:
        print("  "*lvl, type(l))

#pl(latency_paths)

for i, path in enumerate(latency_paths):
    print("===== PATH", i, "=====")
    #print(type(path))
    tot_lat = 0.0
    for item in path:
        tot_lat += item.latencies.mean()
        print(f"{item.topic.name:120s} {item.latencies.mean()*1000:.3f}ms {tot_lat*1000:.3f}ms")

....................................*.......................................*.......................................*........................................*.......................................*...#*....................................*.......................................*.......................................*.........................................*.......................................*.......................................*.......................................*.....#*..........................................*.......................................*.......................................*.......................................*.....#**.....................................*.......................................*.......................................*.......................................*.......................................*.......................................*.......................................*.......................................*......................

In [None]:
fig = plt.figure(figsize=(30, 10))
ax = fig.add_subplot()

for lat_stats in pub_cb_to_lat_stats.values():
    ax.plot(lat_stats.index, np.where(np.isnan(lat_stats), 0, lat_stats))

ax.set_ylim(0, .1)
ax.set_xlim(655+1.652795e9, 660+1.652795e9)
None