Extract trace reading functions

This commit is contained in:
Christophe Bedard 2019-06-24 16:06:17 +02:00
parent cc5109f3e2
commit de8c131a34
12 changed files with 252 additions and 80 deletions

3
tracetools_read/.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
*~
*.pyc

View file

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="2">
<name>tracetools_read</name>
<version>0.0.1</version>
<description>Tools for reading traces</description>
<maintainer email="fixed-term.christophe.bourquebedard@de.bosch.com">Christophe Bedard</maintainer>
<maintainer email="ingo.luetkebohle@de.bosch.com">Ingo Lütkebohle</maintainer>
<license>Apache 2.0</license>
<author email="fixed-term.christophe.bourquebedard@de.bosch.com">Christophe Bedard</author>
<exec_depend>python3-babeltrace</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View file

@ -0,0 +1,4 @@
[develop]
script-dir=$base/lib/tracetools_read
[install]
install-scripts=$base/lib/tracetools_read

29
tracetools_read/setup.py Normal file
View file

@ -0,0 +1,29 @@
from setuptools import find_packages
from setuptools import setup
package_name = 'tracetools_read'
setup(
name=package_name,
version='0.0.1',
packages=find_packages(exclude=['test']),
data_files=[
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
maintainer=(
'Christophe Bedard, '
'Ingo Lütkebohle'
),
maintainer_email=(
'fixed-term.christophe.bourquebedard@de.bosch.com, '
'ingo.luetkebohle@de.bosch.com'
),
author='Christophe Bedard',
author_email='fixed-term.christophe.bourquebedard@de.bosch.com',
# url='',
keywords=['ROS'],
description='Tools for reading trace',
license='Apache 2.0',
tests_require=['pytest'],
)

View file

@ -0,0 +1,23 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View file

@ -0,0 +1,23 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc = main(argv=[])
assert rc == 0, 'Found errors'

View file

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=[])
assert rc == 0, 'Found code style errors / warnings'

View file

@ -0,0 +1,13 @@
# Copyright 2019 Robert Bosch GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View file

@ -0,0 +1,101 @@
# Copyright 2019 Robert Bosch GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module with functions for reading traces."""
from typing import Any
from typing import Dict
from typing import Iterable
from typing import List
import babeltrace
DictEvent = Dict[str, Any]
def _get_trace_ctf_events(trace_directory: str) -> Iterable[babeltrace.babeltrace.Event]:
"""
Get the events of a trace.
:param trace_directory: the path to the main/top trace directory
:return: events iterable
"""
tc = babeltrace.TraceCollection()
tc.add_traces_recursive(trace_directory, 'ctf')
return tc.events
def get_trace_events(trace_directory: str) -> List[DictEvent]:
"""
Get the events of a trace.
:param trace_directory: the path to the main/top trace directory
:return: events
"""
return [event_to_dict(event) for event in _get_trace_ctf_events(trace_directory)]
# List of ignored CTF fields
_IGNORED_FIELDS = [
'content_size',
'cpu_id',
'events_discarded',
'id',
'packet_size',
'packet_seq_num',
'stream_id',
'stream_instance_id',
'timestamp_end',
'timestamp_begin',
'magic',
'uuid',
'v',
]
_DISCARD = 'events_discarded'
def event_to_dict(event: babeltrace.babeltrace.Event) -> DictEvent:
"""
Convert name, timestamp, and all other keys except those in IGNORED_FIELDS into a dictionary.
:param event: the event to convert
:return: the event as a dictionary
"""
d = {'_name': event.name, '_timestamp': event.timestamp}
if hasattr(event, _DISCARD) and event[_DISCARD] > 0:
print(event[_DISCARD])
for key in [key for key in event.keys() if key not in _IGNORED_FIELDS]:
d[key] = event[key]
return d
def get_field(event: DictEvent, field_name: str, default=None, raise_if_not_found=True) -> Any:
field_value = event.get(field_name, default)
# If enabled, raise exception as soon as possible to avoid headaches
if raise_if_not_found and field_value is None:
raise AttributeError(f'event field "{field_name}" not found!')
return field_value
def get_event_name(event: DictEvent) -> str:
return event['_name']
def get_event_timestamp(event: DictEvent) -> int:
return event['_timestamp']
def get_procname(event: DictEvent) -> str:
return event['procname']

View file

@ -27,6 +27,7 @@
<test_depend>python3-pytest</test_depend>
<test_depend>tracetools</test_depend>
<test_depend>tracetools_trace</test_depend>
<test_depend>tracetools_read</test_depend>
<export>
<build_type>ament_cmake</build_type>

View file

@ -20,14 +20,15 @@ from typing import List
from typing import Union
import unittest
from tracetools_read.utils import DictEvent
from tracetools_read.utils import get_event_name
from tracetools_read.utils import get_event_timestamp
from tracetools_read.utils import get_field
from tracetools_read.utils import get_procname
from tracetools_read.utils import get_trace_events
from .utils import cleanup_trace
from .utils import DictEvent
from .utils import get_event_name
from .utils import get_event_names
from .utils import get_event_timestamp
from .utils import get_field
from .utils import get_procname
from .utils import get_trace_events
from .utils import run_and_trace

View file

@ -17,16 +17,15 @@
import os
import shutil
import time
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple
import babeltrace
from launch import LaunchDescription
from launch import LaunchService
from launch_ros import get_default_launch_description
import launch_ros.actions
from tracetools_read.utils import DictEvent
from tracetools_read.utils import get_event_name
from tracetools_trace.tools import lttng
@ -82,81 +81,11 @@ def cleanup_trace(full_path: str) -> None:
shutil.rmtree(full_path)
DictEvent = Dict[str, Any]
def get_trace_events(trace_directory: str) -> List[DictEvent]:
"""
Get the events of a trace.
:param trace_directory: the path to the main/top trace directory
:return: events
"""
tc = babeltrace.TraceCollection()
tc.add_traces_recursive(trace_directory, 'ctf')
return [_event_to_dict(event) for event in tc.events]
# List of ignored CTF fields
_IGNORED_FIELDS = [
'content_size',
'cpu_id',
'events_discarded',
'id',
'packet_size',
'packet_seq_num',
'stream_id',
'stream_instance_id',
'timestamp_end',
'timestamp_begin',
'magic',
'uuid',
'v',
]
_DISCARD = 'events_discarded'
def _event_to_dict(event: babeltrace.babeltrace.Event) -> DictEvent:
"""
Convert name, timestamp, and all other keys except those in IGNORED_FIELDS into a dictionary.
:param event: the event to convert
:return: the event as a dictionary
"""
d = {'_name': event.name, '_timestamp': event.timestamp}
if hasattr(event, _DISCARD) and event[_DISCARD] > 0:
print(event[_DISCARD])
for key in [key for key in event.keys() if key not in _IGNORED_FIELDS]:
d[key] = event[key]
return d
def get_event_names(events: List[DictEvent]) -> List[str]:
"""
Get a list of names of the events in the trace.
Get a list of events names from a list of events.
:param events: the events of the trace
:return: the list of event names
"""
return [get_event_name(e) for e in events]
def get_field(event: DictEvent, field_name: str, default=None, raise_if_not_found=True) -> Any:
field_value = event.get(field_name, default)
# If enabled, raise exception as soon as possible to avoid headaches
if raise_if_not_found and field_value is None:
raise AttributeError(f'event field "{field_name}" not found!')
return field_value
def get_event_name(event: DictEvent) -> str:
return event['_name']
def get_event_timestamp(event: DictEvent) -> int:
return event['_timestamp']
def get_procname(event: DictEvent) -> str:
return event['procname']