Add 'tracetools_analysis/' from commit '62fe169634'

git-subtree-dir: tracetools_analysis
git-subtree-mainline: e5c64669b8
git-subtree-split: 62fe169634
This commit is contained in:
Christophe Bedard 2019-06-24 10:09:52 +02:00
commit 4782b27aa9
21 changed files with 833 additions and 0 deletions

3
tracetools_analysis/.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
*~
*.pyc

View file

View file

@ -0,0 +1,5 @@
*.svg
*.png
*.pdf
.ipynb_checkpoints

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="2">
<name>tracetools_analysis</name>
<version>0.0.1</version>
<description>Tools for analysing trace data</description>
<maintainer email="fixed-term.christophe.bourquebedard@de.bosch.com">Christophe Bedard</maintainer>
<license>TODO</license>
<author email="ingo.luetkebohle@de.bosch.com">Ingo Luetkebohle</author>
<author email="fixed-term.christophe.bourquebedard@de.bosch.com">Christophe Bedard</author>
<exec_depend>python3-babeltrace</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View file

@ -0,0 +1,4 @@
[develop]
script-dir=$base/lib/tracetools_analysis
[install]
install-scripts=$base/lib/tracetools_analysis

View file

@ -0,0 +1,29 @@
from setuptools import find_packages
from setuptools import setup
package_name = 'tracetools_analysis'
setup(
name=package_name,
version='0.0.1',
packages=find_packages(exclude=['test']),
data_files=[
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
maintainer='Christophe Bedard',
maintainer_email='fixed-term.christophe.bourquebedard@de.bosch.com',
author='Ingo Luetkebohle, Christophe Bedard',
author_email='ingo.luetkebohle@de.bosch.com, fixed-term.christophe.bourquebedard@de.bosch.com',
# url='',
keywords=['ROS'],
description='Tools for analysing trace data',
entry_points={
'console_scripts': [
f'convert = {package_name}.convert:main',
f'process = {package_name}.process:main',
],
},
license='TODO',
tests_require=['pytest'],
)

View file

@ -0,0 +1,23 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View file

@ -0,0 +1,23 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc = main(argv=[])
assert rc == 0, 'Found errors'

View file

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=[])
assert rc == 0, 'Found code style errors / warnings'

View file

@ -0,0 +1,2 @@
"""Reading and interpreting of LTTng trace data."""
__author__ = 'Luetkebohle Ingo (CR/AEX3)'

View file

@ -0,0 +1,133 @@
# Data model
import pandas as pd
class DataModel():
"""
Container to model pre-processed data for analysis.
Contains data for an analysis to use. This is a middleground between trace events data and the
output data of an analysis. This aims to represent the data in a ROS-aware way.
It uses pandas DataFrames directly.
"""
def __init__(self) -> None:
# Objects (one-time events, usually when something is created)
self.contexts = pd.DataFrame(columns=['context_handle',
'timestamp',
'pid',
'version'])
self.contexts.set_index(['context_handle'], inplace=True, drop=True)
self.nodes = pd.DataFrame(columns=['node_handle',
'timestamp',
'tid',
'rmw_handle',
'name',
'namespace'])
self.nodes.set_index(['node_handle'], inplace=True, drop=True)
self.publishers = pd.DataFrame(columns=['publisher_handle',
'timestamp',
'node_handle',
'rmw_handle',
'topic_name',
'depth'])
self.publishers.set_index(['publisher_handle'], inplace=True, drop=True)
self.subscriptions = pd.DataFrame(columns=['subscription_handle',
'timestamp',
'node_handle',
'rmw_handle',
'topic_name',
'depth'])
self.subscriptions.set_index(['subscription_handle'], inplace=True, drop=True)
self.services = pd.DataFrame(columns=['service_handle',
'timestamp',
'node_handle',
'rmw_handle',
'service_name'])
self.services.set_index(['service_handle'], inplace=True, drop=True)
self.clients = pd.DataFrame(columns=['client_handle',
'timestamp',
'node_handle',
'rmw_handle',
'service_name'])
self.clients.set_index(['client_handle'], inplace=True, drop=True)
self.timers = pd.DataFrame(columns=['timer_handle',
'timestamp',
'period'])
self.timers.set_index(['timer_handle'], inplace=True, drop=True)
self.callback_objects = pd.DataFrame(columns=['handle',
'timestamp',
'callback_object'])
self.callback_objects.set_index(['handle'], inplace=True, drop=True)
self.callback_symbols = pd.DataFrame(columns=['callback_object',
'timestamp',
'symbol'])
self.callback_symbols.set_index(['callback_object'], inplace=True, drop=True)
# Events (multiple instances, may not have a meaningful index)
self.callback_instances = pd.DataFrame(columns=['callback_object',
'timestamp',
'duration',
'intra_process'])
def add_context(self, context_handle, timestamp, pid, version) -> None:
self.contexts.loc[context_handle] = [timestamp, pid, version]
def add_node(self, node_handle, timestamp, tid, rmw_handle, name, namespace) -> None:
self.nodes.loc[node_handle] = [timestamp, tid, rmw_handle, name, namespace]
def add_publisher(self, handle, timestamp, node_handle, rmw_handle, topic_name, depth) -> None:
self.publishers.loc[handle] = [timestamp, node_handle, rmw_handle, topic_name, depth]
def add_subscription(self, handle, timestamp, node_handle, rmw_handle, topic_name, depth) -> None:
self.subscriptions.loc[handle] = [timestamp, node_handle, rmw_handle, topic_name, depth]
def add_service(self, handle, timestamp, node_handle, rmw_handle, service_name) -> None:
self.services.loc[handle] = [timestamp, node_handle, rmw_handle, service_name]
def add_client(self, handle, timestamp, node_handle, rmw_handle, service_name) -> None:
self.clients.loc[handle] = [timestamp, node_handle, rmw_handle, service_name]
def add_timer(self, handle, timestamp, period) -> None:
self.timers.loc[handle] = [timestamp, period]
def add_callback_object(self, handle, timestamp, callback_object) -> None:
self.callback_objects.loc[handle] = [timestamp, callback_object]
def add_callback_symbol(self, callback_object, timestamp, symbol) -> None:
self.callback_symbols.loc[callback_object] = [timestamp, symbol]
def add_callback_instance(self, callback_object, timestamp, duration, intra_process) -> None:
data = {
'callback_object': callback_object,
'timestamp': timestamp,
'duration': duration,
'intra_process': intra_process,
}
self.callback_instances = self.callback_instances.append(data, ignore_index=True)
def print_model(self) -> None:
"""Debug method to print every contained df."""
print('====================DATA MODEL====================')
print(f'Contexts:\n{self.contexts.to_string()}')
print()
print(f'Nodes:\n{self.nodes.to_string()}')
print()
print(f'Publishers:\n{self.publishers.to_string()}')
print()
print(f'Subscriptions:\n{self.subscriptions.to_string()}')
print()
print(f'Services:\n{self.services.to_string()}')
print()
print(f'Clients:\n{self.clients.to_string()}')
print()
print(f'Timers:\n{self.timers.to_string()}')
print()
print(f'Callback objects:\n{self.callback_objects.to_string()}')
print()
print(f'Callback symbols:\n{self.callback_symbols.to_string()}')
print()
print(f'Callback instances:\n{self.callback_instances.to_string()}')
print('==================================================')

View file

@ -0,0 +1,56 @@
# Event handler
import sys
from typing import Callable
from typing import Dict
from typing import List
from .lttng_models import EventMetadata
from .lttng_models import get_field
from .lttng_models import get_name
class EventHandler():
"""Base event handling class."""
def __init__(self, handler_map: Dict[str, Callable[[Dict, EventMetadata], None]]) -> None:
"""
Constructor.
:param handler_map: the mapping from event name to handling method
"""
self._handler_map = handler_map
def handle_events(self, events: List[Dict[str, str]]) -> None:
"""
Handle events by calling their handlers.
:param events: the events to process
"""
for event in events:
self._handle(event)
def _handle(self, event: Dict[str, str]) -> None:
event_name = get_name(event)
handler_function = self._handler_map.get(event_name, None)
if handler_function is not None:
pid = get_field(
event,
'vpid',
default=get_field(
event,
'pid',
raise_if_not_found=False))
tid = get_field(
event,
'vtid',
default=get_field(
event,
'tid',
raise_if_not_found=False))
timestamp = get_field(event, '_timestamp')
procname = get_field(event, 'procname')
metadata = EventMetadata(event_name, pid, tid, timestamp, procname)
handler_function(event, metadata)
else:
print(f'unhandled event name: {event_name}', file=sys.stderr)

View file

@ -0,0 +1,22 @@
import pickle
from typing import Dict
from typing import List
def load_pickle(pickle_file_path: str) -> List[Dict]:
"""
Load pickle file containing converted trace events.
:param pickle_file_path: the path to the pickle file to load
:return: the list of events read from the file
"""
events = []
with open(pickle_file_path, 'rb') as f:
p = pickle.Unpickler(f)
while True:
try:
events.append(p.load())
except EOFError:
break # we're done
return events

View file

@ -0,0 +1,47 @@
# Model objects for LTTng traces/events
from typing import Any
from typing import Dict
def get_field(event: Dict, field_name: str, default=None, raise_if_not_found=True) -> Any:
field_value = event.get(field_name, default)
# If enabled, raise exception as soon as possible to avoid headaches
if raise_if_not_found and field_value is None:
raise AttributeError(f'event field "{field_name}" not found!')
return field_value
def get_name(event: Dict) -> str:
return get_field(event, '_name')
class EventMetadata():
"""Container for event metadata."""
def __init__(self, event_name, pid, tid, timestamp, procname) -> None:
self._event_name = event_name
self._pid = pid
self._tid = tid
self._timestamp = timestamp
self._procname = procname
@property
def event_name(self):
return self._event_name
@property
def pid(self):
return self._pid
@property
def tid(self):
return self._tid
@property
def timestamp(self):
return self._timestamp
@property
def procname(self):
return self._procname

View file

@ -0,0 +1,170 @@
# Process trace events and create ROS model
from typing import Dict
from typing import List
from .data_model import DataModel
from .handler import EventHandler
from .lttng_models import get_field
from .lttng_models import EventMetadata
class Ros2Processor(EventHandler):
"""
ROS 2-aware event processing/handling class implementation.
Handles a trace's events and builds a model with the data.
"""
def __init__(self) -> None:
# Link a ROS trace event to its corresponding handling method
handler_map = {
'ros2:rcl_init':
self._handle_rcl_init,
'ros2:rcl_node_init':
self._handle_rcl_node_init,
'ros2:rcl_publisher_init':
self._handle_rcl_publisher_init,
'ros2:rcl_subscription_init':
self._handle_subscription_init,
'ros2:rclcpp_subscription_callback_added':
self._handle_rclcpp_subscription_callback_added,
'ros2:rcl_service_init':
self._handle_rcl_service_init,
'ros2:rclcpp_service_callback_added':
self._handle_rclcpp_service_callback_added,
'ros2:rcl_client_init':
self._handle_rcl_client_init,
'ros2:rcl_timer_init':
self._handle_rcl_timer_init,
'ros2:rclcpp_timer_callback_added':
self._handle_rclcpp_timer_callback_added,
'ros2:rclcpp_callback_register':
self._handle_rclcpp_callback_register,
'ros2:callback_start':
self._handle_callback_start,
'ros2:callback_end':
self._handle_callback_end,
}
super().__init__(handler_map)
self._data = DataModel()
# Temporary buffers
self._callback_instances = {}
def get_data_model(self) -> DataModel:
return self._data
def _handle_rcl_init(self, event: Dict, metadata: EventMetadata) -> None:
context_handle = get_field(event, 'context_handle')
timestamp = metadata.timestamp
pid = metadata.pid
version = get_field(event, 'version')
self._data.add_context(context_handle, timestamp, pid, version)
def _handle_rcl_node_init(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'node_handle')
timestamp = metadata.timestamp
tid = metadata.tid
rmw_handle = get_field(event, 'rmw_handle')
name = get_field(event, 'node_name')
namespace = get_field(event, 'namespace')
self._data.add_node(handle, timestamp, tid, rmw_handle, name, namespace)
def _handle_rcl_publisher_init(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'publisher_handle')
timestamp = metadata.timestamp
node_handle = get_field(event, 'node_handle')
rmw_handle = get_field(event, 'rmw_publisher_handle')
topic_name = get_field(event, 'topic_name')
depth = get_field(event, 'queue_depth')
self._data.add_publisher(handle, timestamp, node_handle, rmw_handle, topic_name, depth)
def _handle_subscription_init(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'subscription_handle')
timestamp = metadata.timestamp
node_handle = get_field(event, 'node_handle')
rmw_handle = get_field(event, 'rmw_subscription_handle')
topic_name = get_field(event, 'topic_name')
depth = get_field(event, 'queue_depth')
self._data.add_subscription(handle, timestamp, node_handle, rmw_handle, topic_name, depth)
def _handle_rclcpp_subscription_callback_added(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'subscription_handle')
timestamp = metadata.timestamp
callback_object = get_field(event, 'callback')
self._data.add_callback_object(handle, timestamp, callback_object)
def _handle_rcl_service_init(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'service_handle')
timestamp = metadata.timestamp
node_handle = get_field(event, 'node_handle')
rmw_handle = get_field(event, 'rmw_service_handle')
service_name = get_field(event, 'service_name')
self._data.add_service(handle, timestamp, node_handle, rmw_handle, service_name)
def _handle_rclcpp_service_callback_added(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'service_handle')
timestamp = metadata.timestamp
callback_object = get_field(event, 'callback')
self._data.add_callback_object(handle, timestamp, callback_object)
def _handle_rcl_client_init(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'client_handle')
timestamp = metadata.timestamp
node_handle = get_field(event, 'node_handle')
rmw_handle = get_field(event, 'rmw_client_handle')
service_name = get_field(event, 'service_name')
self._data.add_client(handle, timestamp, node_handle, rmw_handle, service_name)
def _handle_rcl_timer_init(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'timer_handle')
timestamp = metadata.timestamp
period = get_field(event, 'period')
self._data.add_timer(handle, timestamp, period)
def _handle_rclcpp_timer_callback_added(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'timer_handle')
timestamp = metadata.timestamp
callback_object = get_field(event, 'callback')
self._data.add_callback_object(handle, timestamp, callback_object)
def _handle_rclcpp_callback_register(self, event: Dict, metadata: EventMetadata) -> None:
callback_object = get_field(event, 'callback')
timestamp = metadata.timestamp
symbol = get_field(event, 'symbol')
self._data.add_callback_symbol(callback_object, timestamp, symbol)
def _handle_callback_start(self, event: Dict, metadata: EventMetadata) -> None:
# Add to dict
callback_addr = get_field(event, 'callback')
self._callback_instances[callback_addr] = (event, metadata)
def _handle_callback_end(self, event: Dict, metadata: EventMetadata) -> None:
# Fetch from dict
callback_object = get_field(event, 'callback')
(event_start, metadata_start) = self._callback_instances.get(callback_object)
if event_start is not None and metadata_start is not None:
del self._callback_instances[callback_object]
duration = metadata.timestamp - metadata_start.timestamp
is_intra_process = get_field(event_start, 'is_intra_process', raise_if_not_found=False)
self._data.add_callback_instance(
callback_object,
metadata_start.timestamp,
duration,
bool(is_intra_process))
else:
print(f'No matching callback start for callback object "{callback_object}"')
def ros2_process(events: List[Dict[str, str]]) -> Ros2Processor:
"""
Process unpickled events and create ROS 2 model.
:param events: the list of events
:return: the processor object
"""
processor = Ros2Processor()
processor.handle_events(events)
return processor

View file

@ -0,0 +1,64 @@
# CTF to pickle conversion
from pickle import Pickler
import babeltrace
# List of ignored CTF fields
_IGNORED_FIELDS = [
'content_size', 'cpu_id', 'events_discarded', 'id', 'packet_size', 'packet_seq_num',
'stream_id', 'stream_instance_id', 'timestamp_end', 'timestamp_begin', 'magic', 'uuid', 'v'
]
_DISCARD = 'events_discarded'
def ctf_to_pickle(trace_directory: str, target: Pickler) -> int:
"""
Load CTF trace and convert to a pickle file.
:param trace_directory: the main/top trace directory
:param target: the target pickle file to write to
:return: the number of events written
"""
# add traces
tc = babeltrace.TraceCollection()
print(f'Importing trace directory: {trace_directory}')
tc.add_traces_recursive(trace_directory, 'ctf')
count = 0
count_written = 0
# count_pid_matched = 0
# traced = set()
# PID_KEYS = ['vpid', 'pid']
for event in tc.events:
count += 1
# pid = None
# for key in PID_KEYS:
# if key in event.keys():
# pid = event[key]
# break
# Write all for now
pod = _ctf_event_to_pod(event)
target.dump(pod)
count_written += 1
return count_written
def _ctf_event_to_pod(ctf_event):
"""
Convert name, timestamp, and all other keys except those in IGNORED_FIELDS into a dictionary.
:param ctf_element: The element to convert
:type ctf_element: babeltrace.Element
:return:
:return type: dict
"""
pod = {'_name': ctf_event.name, '_timestamp': ctf_event.timestamp}
if hasattr(ctf_event, _DISCARD) and ctf_event[_DISCARD] > 0:
print(ctf_event[_DISCARD])
for key in [key for key in ctf_event.keys() if key not in _IGNORED_FIELDS]:
pod[key] = ctf_event[key]
return pod

View file

@ -0,0 +1,29 @@
#!/usr/bin/env python3
# Entrypoint/script to convert CTF trace data to a pickle file
import argparse
from pickle import Pickler
import time
from tracetools_analysis.conversion import ctf
def parse_args():
parser = argparse.ArgumentParser(description='Convert CTF trace data to a pickle file.')
parser.add_argument('trace_directory', help='the path to the main CTF trace directory')
parser.add_argument('pickle_file', help='the target pickle file to generate')
return parser.parse_args()
def main():
args = parse_args()
trace_directory = args.trace_directory
pickle_target_file = args.pickle_file
start_time = time.time()
with open(pickle_target_file, 'wb') as f:
p = Pickler(f, protocol=4)
count = ctf.ctf_to_pickle(trace_directory, p)
time_diff = time.time() - start_time
print(f'converted {count} events in {time_diff * 1000:.2f} ms')

View file

@ -0,0 +1,29 @@
#!/usr/bin/env python3
# Entrypoint/script to process events from a pickle file to build a ROS model
import argparse
import time
from tracetools_analysis.analysis import load
from tracetools_analysis.analysis import ros2_processor
def parse_args():
parser = argparse.ArgumentParser(description='Process a pickle file generated '
'from tracing and analyze the data.')
parser.add_argument('pickle_file', help='the pickle file to import')
return parser.parse_args()
def main():
args = parse_args()
pickle_filename = args.pickle_file
start_time = time.time()
events = load.load_pickle(pickle_filename)
processor = ros2_processor.ros2_process(events)
time_diff = time.time() - start_time
print(f'processed {len(events)} events in {time_diff * 1000:.2f} ms')
processor.get_data_model().print_model()