Extract tracetools_trace from tracetools_analysis
This commit is contained in:
parent
6aba4109c3
commit
17fe8d2245
22 changed files with 614 additions and 272 deletions
2
tracetools_analysis/__init__.py
Normal file
2
tracetools_analysis/__init__.py
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
"""Reading and interpreting of LTTng trace data."""
|
||||
__author__ = 'Luetkebohle Ingo (CR/AEX3)'
|
||||
0
tracetools_analysis/analysis/__init__.py
Normal file
0
tracetools_analysis/analysis/__init__.py
Normal file
36
tracetools_analysis/analysis/lttng_models.py
Normal file
36
tracetools_analysis/analysis/lttng_models.py
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
# Model objects for LTTng traces/events
|
||||
|
||||
def get_field(event, field_name, default=None):
|
||||
return event.get(field_name, default)
|
||||
|
||||
def get_name(event):
|
||||
return get_field(event, '_name')
|
||||
|
||||
|
||||
class EventMetadata():
|
||||
def __init__(self, event_name, pid, tid, timestamp, procname):
|
||||
self._event_name = event_name
|
||||
self._pid = pid
|
||||
self._tid = tid
|
||||
self._timestamp = timestamp
|
||||
self._procname = procname
|
||||
|
||||
@property
|
||||
def event_name(self):
|
||||
return self._event_name
|
||||
|
||||
@property
|
||||
def pid(self):
|
||||
return self._pid
|
||||
|
||||
@property
|
||||
def tid(self):
|
||||
return self._tid
|
||||
|
||||
@property
|
||||
def timestamp(self):
|
||||
return self._timestamp
|
||||
|
||||
@property
|
||||
def procname(self):
|
||||
return self._procname
|
||||
75
tracetools_analysis/analysis/ros_processor.py
Normal file
75
tracetools_analysis/analysis/ros_processor.py
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
# Process trace events and create ROS model
|
||||
|
||||
import sys
|
||||
from .lttng_models import EventMetadata, get_field, get_name
|
||||
|
||||
def ros_process(events):
|
||||
"""
|
||||
Process unpickled events and create ROS model
|
||||
:param events (list(dict(str:str:))): the list of events
|
||||
:return the processor object
|
||||
"""
|
||||
processor = RosProcessor()
|
||||
processor.process_events(events)
|
||||
return processor
|
||||
|
||||
class RosProcessor():
|
||||
"""
|
||||
ROS-aware event processing/handling class.
|
||||
Handles a trace's events and builds a model with the data.
|
||||
"""
|
||||
def __init__(self):
|
||||
# TODO add other stuff
|
||||
# Instances of callback_start for eventual matching
|
||||
self._callback_starts = {}
|
||||
# Callback instances, callback_address: (end - start, start)
|
||||
self.callbacks_instances = {}
|
||||
|
||||
# Link a ROS trace event to its corresponding handling method
|
||||
self._handler_map = {
|
||||
'ros2:rcl_subscription_init': self._handle_subscription_init,
|
||||
'ros2:rclcpp_subscription_callback_added': self._handle_subscription_callback_added,
|
||||
'ros2:rclcpp_subscription_callback_start': self._handle_subscription_callback_start,
|
||||
'ros2:rclcpp_subscription_callback_end': self._handle_subscription_callback_end,
|
||||
}
|
||||
|
||||
def process_events(self, events):
|
||||
"""
|
||||
Process events
|
||||
:param events (list(dict(str:str))): the events to process
|
||||
"""
|
||||
for event in events:
|
||||
self._handle(event)
|
||||
|
||||
def _handle(self, event):
|
||||
event_name = get_name(event)
|
||||
handler_function = self._handler_map.get(event_name, None)
|
||||
if handler_function is not None:
|
||||
pid = get_field(event, 'vpid', default=get_field(event, 'pid'))
|
||||
tid = get_field(event, 'vtid', default=get_field(event, 'tid'))
|
||||
timestamp = get_field(event, '_timestamp')
|
||||
procname = get_field(event, 'procname')
|
||||
metadata = EventMetadata(event_name, pid, tid, timestamp, procname)
|
||||
handler_function(event, metadata)
|
||||
else:
|
||||
print(f'unhandled event name: {event_name}', file=sys.stderr)
|
||||
|
||||
def _handle_subscription_init(self, event, metadata):
|
||||
# TODO
|
||||
pass
|
||||
|
||||
def _handle_subscription_callback_added(self, event, metadata):
|
||||
# Add the callback address key and create an empty list
|
||||
callback_addr = get_field(event, 'callback')
|
||||
self.callbacks_instances[callback_addr] = []
|
||||
|
||||
def _handle_subscription_callback_start(self, event, metadata):
|
||||
callback_addr = get_field(event, 'callback')
|
||||
self._callback_starts[callback_addr] = metadata.timestamp
|
||||
|
||||
def _handle_subscription_callback_end(self, event, metadata):
|
||||
callback_addr = get_field(event, 'callback')
|
||||
start_timestamp = self._callback_starts.pop(callback_addr, None)
|
||||
if start_timestamp is not None:
|
||||
duration = metadata.timestamp - start_timestamp
|
||||
self.callbacks_instances[callback_addr].append((duration, start_timestamp))
|
||||
20
tracetools_analysis/analysis/to_pandas.py
Normal file
20
tracetools_analysis/analysis/to_pandas.py
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
# Convert processor object to pandas dataframe
|
||||
|
||||
import pandas as pd
|
||||
from .ros_processor import RosProcessor
|
||||
|
||||
def callback_durations_to_df(ros_processor):
|
||||
callback_addresses = []
|
||||
durations = []
|
||||
start_timestamps = []
|
||||
for addr in ros_processor.callbacks_instances:
|
||||
for duration, start in ros_processor.callbacks_instances[addr]:
|
||||
callback_addresses.append(addr)
|
||||
durations.append(duration)
|
||||
start_timestamps.append(start)
|
||||
|
||||
return pd.DataFrame(data={
|
||||
'callback_address': callback_addresses,
|
||||
'duration': durations,
|
||||
'start_timestamp': start_timestamps
|
||||
})
|
||||
0
tracetools_analysis/conversion/__init__.py
Normal file
0
tracetools_analysis/conversion/__init__.py
Normal file
63
tracetools_analysis/conversion/ctf.py
Normal file
63
tracetools_analysis/conversion/ctf.py
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
# CTF to pickle conversion
|
||||
|
||||
import babeltrace
|
||||
from pickle import Pickler
|
||||
import time
|
||||
|
||||
# List of ignored CTF fields
|
||||
_IGNORED_FIELDS = [
|
||||
'content_size', 'cpu_id', 'events_discarded', 'id', 'packet_size', 'packet_seq_num',
|
||||
'stream_id', 'stream_instance_id', 'timestamp_end', 'timestamp_begin', 'magic', 'uuid', 'v'
|
||||
]
|
||||
_DISCARD = 'events_discarded'
|
||||
|
||||
def ctf_to_pickle(trace_directory, target):
|
||||
"""
|
||||
Load CTF trace and convert to a pickle file
|
||||
:param trace_directory (str): the main/top trace directory
|
||||
:param target (Pickler): the target pickle file to write to
|
||||
"""
|
||||
# add traces
|
||||
tc = babeltrace.TraceCollection()
|
||||
print(f'Importing trace directory: {trace_directory}')
|
||||
tc.add_traces_recursive(trace_directory, 'ctf')
|
||||
|
||||
count = 0
|
||||
count_written = 0
|
||||
# count_pid_matched = 0
|
||||
# traced = set()
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
# PID_KEYS = ['vpid', 'pid']
|
||||
for event in tc.events:
|
||||
count += 1
|
||||
# pid = None
|
||||
# for key in PID_KEYS:
|
||||
# if key in event.keys():
|
||||
# pid = event[key]
|
||||
# break
|
||||
|
||||
# Write all for now
|
||||
pod = _ctf_event_to_pod(event)
|
||||
target.dump(pod)
|
||||
count_written += 1
|
||||
|
||||
time_diff = time.time() - start_time
|
||||
print(f'{count_written} events in {time_diff * 1000:.2f} ms')
|
||||
|
||||
|
||||
def _ctf_event_to_pod(ctf_event):
|
||||
"""
|
||||
Convert name, timestamp, and all other keys except those in IGNORED_FIELDS into a dictionary.
|
||||
:param ctf_element: The element to convert
|
||||
:type ctf_element: babeltrace.Element
|
||||
:return:
|
||||
:return type: dict
|
||||
"""
|
||||
pod = {'_name': ctf_event.name, '_timestamp': ctf_event.timestamp}
|
||||
if hasattr(ctf_event, _DISCARD) and ctf_event[_DISCARD] > 0:
|
||||
print(ctf_event[_DISCARD])
|
||||
for key in [key for key in ctf_event.keys() if key not in _IGNORED_FIELDS]:
|
||||
pod[key] = ctf_event[key]
|
||||
return pod
|
||||
19
tracetools_analysis/convert.py
Normal file
19
tracetools_analysis/convert.py
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
#!/usr/bin/env python3
|
||||
# Entrypoint/script to convert CTF trace data to a pickle file
|
||||
# TODO
|
||||
|
||||
import sys
|
||||
from pickle import Pickler
|
||||
from tracetools_analysis.conversion.ctf import *
|
||||
|
||||
def main(argv=sys.argv):
|
||||
if len(argv) != 3:
|
||||
print("usage: /trace/directory pickle_target_file")
|
||||
exit(1)
|
||||
|
||||
trace_directory = sys.argv[1]
|
||||
pickle_target_file = sys.argv[2]
|
||||
|
||||
with open(pickle_target_file, 'wb') as f:
|
||||
p = Pickler(f, protocol=4)
|
||||
ctf_to_pickle(trace_directory, p)
|
||||
33
tracetools_analysis/process.py
Normal file
33
tracetools_analysis/process.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
#!/usr/bin/env python3
|
||||
# Entrypoint/script to process events from a pickle file to build a ROS model
|
||||
|
||||
import sys
|
||||
import pickle
|
||||
import pandas as pd
|
||||
from tracetools_analysis.analysis.ros_processor import *
|
||||
from tracetools_analysis.analysis.to_pandas import *
|
||||
|
||||
def main(argv=sys.argv):
|
||||
if len(argv) != 2:
|
||||
print('usage: pickle_file')
|
||||
exit(1)
|
||||
|
||||
pickle_filename = sys.argv[1]
|
||||
with open(pickle_filename, 'rb') as f:
|
||||
events = _get_events_from_pickled_file(f)
|
||||
print(f'imported {len(events)} events')
|
||||
processor = ros_process(events)
|
||||
|
||||
df = callback_durations_to_df(processor)
|
||||
print(df.to_string())
|
||||
|
||||
|
||||
def _get_events_from_pickled_file(file):
|
||||
p = pickle.Unpickler(file)
|
||||
events = []
|
||||
while True:
|
||||
try:
|
||||
events.append(p.load())
|
||||
except EOFError as _:
|
||||
break # we're done
|
||||
return events
|
||||
0
tracetools_analysis/test/__init__.py
Normal file
0
tracetools_analysis/test/__init__.py
Normal file
41
tracetools_analysis/test/utils.py
Normal file
41
tracetools_analysis/test/utils.py
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
# Utils for tracetools testing
|
||||
|
||||
import subprocess
|
||||
import babeltrace
|
||||
from ..trace import *
|
||||
|
||||
def get_trace_event_names(trace_directory):
|
||||
"""
|
||||
Get a set of event names in a trace
|
||||
:param trace_directory (str): the path to the main/top trace directory
|
||||
:return: event names (set(str))
|
||||
"""
|
||||
tc = babeltrace.TraceCollection()
|
||||
tc.add_traces_recursive(trace_directory, 'ctf')
|
||||
|
||||
event_names = set()
|
||||
|
||||
for event in tc.events:
|
||||
event_names.add(event.name)
|
||||
|
||||
return event_names
|
||||
|
||||
|
||||
def run_and_trace(package_name, executable_name, session_name, path):
|
||||
"""
|
||||
Setup, start tracing, and run a ROS 2 executable
|
||||
:param package_name (str): the name of the package
|
||||
:param executable_name (str): the name of the executable to run
|
||||
:param session_name (str): the name of the session
|
||||
:param directory (str): the path of the main directory to write trace data to
|
||||
"""
|
||||
# Enable all events
|
||||
lttng_setup(session_name, path)
|
||||
lttng_start(session_name)
|
||||
_run(package_name, executable_name)
|
||||
lttng_stop(session_name)
|
||||
lttng_destroy(session_name)
|
||||
|
||||
|
||||
def _run(package_name, executable_name):
|
||||
subprocess.check_call(['ros2', 'run', package_name, executable_name])
|
||||
0
tracetools_analysis/tracing/__init__.py
Normal file
0
tracetools_analysis/tracing/__init__.py
Normal file
Loading…
Add table
Add a link
Reference in a new issue