took the tracetools_read and tracetools_trace from upstream (rolling)

This commit is contained in:
Niklas Halle 2025-05-20 16:27:27 +02:00
parent e8637c9043
commit 1b96054945
25 changed files with 1555 additions and 438 deletions

View file

@ -2,11 +2,68 @@
Changelog for package tracetools_read
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1.0.6 (2023-05-27)
8.7.0 (2025-04-24)
------------------
* Merge branch 'version-1-0-5' into 'foxy'
8.6.0 (2025-04-17)
------------------
* Improve Python typing annotations (`#152 <https://github.com/ros2/ros2_tracing/issues/152>`_)
* Expose types for tracing tools (`#153 <https://github.com/ros2/ros2_tracing/issues/153>`_)
* Contributors: Christophe Bedard, Michael Carlstrom
8.5.0 (2024-12-20)
------------------
8.4.1 (2024-11-25)
------------------
8.4.0 (2024-10-15)
------------------
8.3.0 (2024-04-26)
------------------
8.2.0 (2024-04-16)
------------------
* Replace all occurences of index.ros.org (`#114 <https://github.com/ros2/ros2_tracing/issues/114>`_)
* Improve tracetools_test and simplify test_tracetools code (`#109 <https://github.com/ros2/ros2_tracing/issues/109>`_)
* Contributors: Chris Lalancette, Christophe Bedard
8.1.0 (2024-03-27)
------------------
* Allow tracing tests to be run in parallel with other tests (`#95 <https://github.com/ros2/ros2_tracing/issues/95>`_)
* Contributors: Christophe Bedard
8.0.0 (2024-01-23)
------------------
7.1.0 (2023-08-23)
------------------
7.0.0 (2023-06-09)
------------------
6.4.1 (2023-05-11)
------------------
6.4.0 (2023-04-28)
------------------
6.3.0 (2023-04-18)
------------------
6.2.0 (2023-04-18)
------------------
6.1.0 (2023-04-13)
------------------
6.0.0 (2023-04-12)
------------------
5.1.0 (2023-03-02)
------------------
0.2.11 (2019-12-09)
-------------------
* Register Python packages in the ament index

View file

@ -1,12 +1,15 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="2">
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>tracetools_read</name>
<version>1.0.6</version>
<version>8.7.0</version>
<description>Tools for reading traces.</description>
<maintainer email="bedard.christophe@gmail.com">Christophe Bedard</maintainer>
<maintainer email="ingo.luetkebohle@de.bosch.com">Ingo Luetkebohle</maintainer>
<license>Apache 2.0</license>
<url type="website">https://docs.ros.org/en/rolling/p/tracetools_read/</url>
<url type="repository">https://github.com/ros2/ros2_tracing</url>
<url type="bugtracker">https://github.com/ros2/ros2_tracing/issues</url>
<author email="fixed-term.christophe.bourquebedard@de.bosch.com">Christophe Bedard</author>
<exec_depend>python3-babeltrace</exec_depend>

View file

@ -1,4 +1,6 @@
[develop]
script-dir=$base/lib/tracetools_read
script_dir=$base/lib/tracetools_read
[install]
install-scripts=$base/lib/tracetools_read
install_scripts=$base/lib/tracetools_read
[mypy]
warn-unused-ignores = True

View file

@ -5,7 +5,7 @@ package_name = 'tracetools_read'
setup(
name=package_name,
version='1.0.6',
version='8.7.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/' + package_name, ['package.xml']),
@ -23,7 +23,7 @@ setup(
),
author='Christophe Bedard',
author_email='fixed-term.christophe.bourquebedard@de.bosch.com',
url='https://gitlab.com/ros-tracing/ros2_tracing',
url='https://github.com/ros2/ros2_tracing',
keywords=[],
description='Tools for reading traces.',
license='Apache 2.0',

View file

@ -12,12 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc = main(argv=[])
assert rc == 0, 'Found errors'
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View file

@ -16,6 +16,7 @@
from typing import Any
from typing import Dict
from typing import List
DictEvent = Dict[str, Any]
@ -54,5 +55,45 @@ def get_event_timestamp(event: DictEvent) -> int:
return event['_timestamp']
def get_event_pid(event: DictEvent) -> int:
return event['vpid']
def get_procname(event: DictEvent) -> str:
return event['procname']
def get_tid(event: DictEvent) -> str:
return event['vtid']
def get_events_with_name(
event_name: str,
events: List[DictEvent],
) -> List[DictEvent]:
"""
Get all events with the given name.
:param event_name: the event name
:param events: the events to check
:return: the list of events with the given name
"""
return [e for e in events if get_event_name(e) == event_name]
def get_events_with_field_value(
field_name: str,
field_values: Any,
events: List[DictEvent],
) -> List[DictEvent]:
"""
Get all events with the given field:value.
:param field_name: the name of the field to check
:param field_values: the value(s) of the field to check
:param events: the events to check
:return: the events with the given field:value pair
"""
if not isinstance(field_values, (list, set)):
field_values = [field_values]
return [e for e in events if get_field(e, field_name, None) in field_values]

View file

@ -0,0 +1 @@

View file

@ -2,11 +2,126 @@
Changelog for package tracetools_trace
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1.0.6 (2023-05-27)
8.7.0 (2025-04-24)
------------------
8.6.0 (2025-04-17)
------------------
* Improve Python typing annotations (`#152 <https://github.com/ros2/ros2_tracing/issues/152>`_)
* Expose types for tracing tools (`#153 <https://github.com/ros2/ros2_tracing/issues/153>`_)
* Remove unnecessary 'type: ignore' comments in tracetools_trace (`#151 <https://github.com/ros2/ros2_tracing/issues/151>`_)
* Contributors: Christophe Bedard, Michael Carlstrom
8.5.0 (2024-12-20)
------------------
* Instrument client/service for end-to-end request/response tracking (`#145 <https://github.com/ros2/ros2_tracing/issues/145>`_)
* Contributors: Christophe Bedard
8.4.1 (2024-11-25)
------------------
8.4.0 (2024-10-15)
------------------
* Allow enabling syscalls through ``ros2 trace`` or the Trace action (`#137 <https://github.com/ros2/ros2_tracing/issues/137>`_)
* Contributors: Christophe Bedard
8.3.0 (2024-04-26)
------------------
8.2.0 (2024-04-16)
------------------
* Replace all occurences of index.ros.org (`#114 <https://github.com/ros2/ros2_tracing/issues/114>`_)
* Contributors: Chris Lalancette
8.1.0 (2024-03-27)
------------------
* Improve tracing configuration error reporting (`#85 <https://github.com/ros2/ros2_tracing/issues/85>`_)
* Add a space in between not and parentheses. (`#88 <https://github.com/ros2/ros2_tracing/issues/88>`_)
* Contributors: Chris Lalancette, Christophe Bedard
8.0.0 (2024-01-23)
------------------
* Switch to custom lttng-ctl Python bindings (`#81 <https://github.com/ros2/ros2_tracing/issues/81>`_)
* Contributors: Christophe Bedard
7.1.0 (2023-08-23)
------------------
* Create start/pause/resume/stop sub-commands for 'ros2 trace' (`#70 <https://github.com/ros2/ros2_tracing/issues/70>`_)
* Contributors: Christophe Bedard
7.0.0 (2023-06-09)
------------------
6.4.1 (2023-05-11)
------------------
* Detect issue with LTTng and Docker and report error when tracing (`#66 <https://github.com/ros2/ros2_tracing/issues/66>`_)
* Contributors: Christophe Bedard
6.4.0 (2023-04-28)
------------------
6.3.0 (2023-04-18)
------------------
6.2.0 (2023-04-18)
------------------
* Error out if trace already exists unless 'append' option is used (`#58 <https://github.com/ros2/ros2_tracing/issues/58>`_)
* Improve 'ros2 trace' command error handling & add end-to-end tests (`#54 <https://github.com/ros2/ros2_tracing/issues/54>`_)
* Make subbuffer size configurable with Trace action (`#51 <https://github.com/ros2/ros2_tracing/issues/51>`_)
* Contributors: Christophe Bedard, Christopher Wecht
6.1.0 (2023-04-13)
------------------
* Add intra-process tracepoints (`#30 <https://github.com/ros2/ros2_tracing/issues/30>`_)
* Contributors: ymski
6.0.0 (2023-04-12)
------------------
* Allow requiring minimum lttng package version for is_lttng_installed (`#59 <https://github.com/ros2/ros2_tracing/issues/59>`_)
* Include tracepoints by default on Linux (`#31 <https://github.com/ros2/ros2_tracing/issues/31>`_)
* Enable document generation using rosdoc2 for ament_python pkgs (`#50 <https://github.com/ros2/ros2_tracing/issues/50>`_)
* Contributors: Christophe Bedard, Yadu
5.1.0 (2023-03-02)
------------------
5.0.0 (2023-02-14)
------------------
* Replace distutils.version.StrictVersion with packaging.version.Version (`#42 <https://github.com/ros2/ros2_tracing/issues/42>`_)
* Remove deprecated context_names parameter (`#38 <https://github.com/ros2/ros2_tracing/issues/38>`_)
* Contributors: Christophe Bedard
4.0.0 (2022-01-20)
------------------
* Disable kernel tracing by default
* Don't require kernel tracer and detect when it's not installed
* Introduce constants for tracepoint names
* Optimize default tracing session channel config values
* Deprecate 'context_names' param and replace with 'context_fields'
* Support per-domain context fields for the Trace action
* Contributors: Christophe Bedard
3.1.0 (2021-08-11)
------------------
* Add support for rmw init/pub, take, and executor tracepoints
* Contributors: Christophe Bedard
2.2.0 (2021-03-29)
------------------
* Add support for rcl_publish and rclcpp_publish tracepoints
* Contributors: Christophe Bedard
2.1.0 (2021-01-13)
------------------
* Merge branch 'foxy-backport-fix-flake8-blind-exception-error' into 'foxy'
* Fix flake8 blind except error by using more concrete types
* Merge branch 'version-1-0-5' into 'foxy'
* Allow configuring tracing directory through environment variables
* Cleanly stop ros2trace/tracetools_trace tracing on SIGINT
* Add instrumentation support for linking a timer to a node
* Contributors: Christophe Bedard
2.0.0 (2020-10-12)
------------------
* Add lifecycle node state transition instrumentation
* Contributors: Christophe Bedard, Ingo Lütkebohle
1.0.1 (2020-05-27)

View file

@ -1,15 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format2.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="2">
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>tracetools_trace</name>
<version>1.0.6</version>
<version>8.7.0</version>
<description>Tools for setting up tracing sessions.</description>
<maintainer email="bedard.christophe@gmail.com">Christophe Bedard</maintainer>
<maintainer email="ingo.luetkebohle@de.bosch.com">Ingo Luetkebohle</maintainer>
<license>Apache 2.0</license>
<url type="website">https://docs.ros.org/en/rolling/p/tracetools_trace/</url>
<url type="repository">https://github.com/ros2/ros2_tracing</url>
<url type="bugtracker">https://github.com/ros2/ros2_tracing/issues</url>
<author email="fixed-term.christophe.bourquebedard@de.bosch.com">Christophe Bedard</author>
<exec_depend>python3-lttng</exec_depend>
<exec_depend>lttngpy</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>

View file

@ -1,4 +1,6 @@
[develop]
script-dir=$base/lib/tracetools_trace
script_dir=$base/lib/tracetools_trace
[install]
install-scripts=$base/lib/tracetools_trace
install_scripts=$base/lib/tracetools_trace
[mypy]
warn-unused-ignores = True

View file

@ -5,7 +5,7 @@ package_name = 'tracetools_trace'
setup(
name=package_name,
version='1.0.6',
version='8.7.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/' + package_name, ['package.xml']),
@ -23,7 +23,7 @@ setup(
),
author='Christophe Bedard',
author_email='fixed-term.christophe.bourquebedard@de.bosch.com',
url='https://gitlab.com/ros-tracing/ros2_tracing',
url='https://github.com/ros2/ros2_tracing',
keywords=[],
description='Tools for setting up tracing sessions.',
long_description=(

View file

@ -12,12 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc = main(argv=[])
assert rc == 0, 'Found errors'
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View file

@ -0,0 +1,167 @@
# Copyright 2021 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import tempfile
import unittest
from unittest import mock
from packaging.version import Version
from tracetools_trace.tools.lttng import is_lttng_installed
@unittest.skipIf(not is_lttng_installed(), 'LTTng is required')
class TestLttngTracing(unittest.TestCase):
def __init__(self, *args) -> None:
super().__init__(
*args,
)
def test_is_lttng_installed(self):
# Different OS
with mock.patch('platform.system', return_value='Windows'):
self.assertFalse(is_lttng_installed())
# lttng-ctl or version not found
with mock.patch('tracetools_trace.tools.lttng.get_lttng_version', return_value=None):
self.assertFalse(is_lttng_installed())
# Minimum version requirement
with mock.patch(
'tracetools_trace.tools.lttng.get_lttng_version',
return_value=Version('1.2.3'),
):
self.assertFalse(is_lttng_installed(minimum_version='1.2.4'))
self.assertTrue(is_lttng_installed(minimum_version='1.2.3'))
self.assertTrue(is_lttng_installed())
def test_lttng_not_installed(self):
from tracetools_trace.tools.lttng import lttng_init
with mock.patch('tracetools_trace.tools.lttng.is_lttng_installed', return_value=False):
self.assertIsNone(lttng_init(session_name='test-session', base_path='/tmp'))
def test_no_kernel_tracer(self):
from tracetools_trace.tools.lttng_impl import setup
with (
mock.patch(
'tracetools_trace.tools.lttng_impl.is_session_daemon_not_alive',
return_value=False,
),
mock.patch(
'tracetools_trace.tools.lttng_impl.is_session_daemon_unreachable',
return_value=False,
),
mock.patch(
'tracetools_trace.tools.lttng_impl.is_kernel_tracer_available',
return_value=False,
),
):
with self.assertRaises(RuntimeError):
setup(
session_name='test-session',
base_path='/tmp',
kernel_events=['sched_switch'],
)
with self.assertRaises(RuntimeError):
setup(
session_name='test-session',
base_path='/tmp',
syscalls=['open'],
)
def test_get_lttng_home(self):
from tracetools_trace.tools.lttng_impl import get_lttng_home
# Uses $LTTNG_HOME if set
environ = {'LTTNG_HOME': 'the_lttng_home', 'HOME': 'the_home'}
with mock.patch.dict(os.environ, environ, clear=True):
self.assertEqual('the_lttng_home', get_lttng_home())
# Defaults to $HOME if LTTNG_HOME is unset
environ = {'HOME': 'the_home'}
with mock.patch.dict(os.environ, environ, clear=True):
self.assertEqual('the_home', get_lttng_home())
# Returns `None` otherwise
with mock.patch.dict(os.environ, {}, clear=True):
self.assertIsNone(get_lttng_home())
def test_get_session_daemon_pid(self):
from tracetools_trace.tools.lttng_impl import get_session_daemon_pid
# No PID if there is no LTTng home
with mock.patch('tracetools_trace.tools.lttng_impl.get_lttng_home', return_value=None):
self.assertIsNone(get_session_daemon_pid())
# No PID if the PID file doesn't exist
with mock.patch(
'tracetools_trace.tools.lttng_impl.get_lttng_home',
return_value=os.path.join(tempfile.gettempdir(), 'doesnt_exist'),
):
self.assertIsNone(get_session_daemon_pid())
# PID file exists...
with (
mock.patch(
'tracetools_trace.tools.lttng_impl.get_lttng_home',
return_value='some_non-None_value',
),
mock.patch('os.path.isfile', return_value=True),
):
# ...but is not a valid int
with mock.patch('builtins.open', mock.mock_open(read_data='')):
self.assertIsNone(get_session_daemon_pid())
with mock.patch('builtins.open', mock.mock_open(read_data='abc')):
self.assertIsNone(get_session_daemon_pid())
# ...and has a valid int when stripped
with mock.patch('builtins.open', mock.mock_open(read_data='123\n')):
self.assertEqual(123, get_session_daemon_pid())
def test_is_session_daemon_unreachable(self):
from tracetools_trace.tools.lttng_impl import is_session_daemon_unreachable
# All good if we can't get the session daemon PID
with mock.patch(
'tracetools_trace.tools.lttng_impl.get_session_daemon_pid',
return_value=None,
):
self.assertFalse(is_session_daemon_unreachable())
# If we can get the session daemon PID...
with mock.patch(
'tracetools_trace.tools.lttng_impl.get_session_daemon_pid',
return_value=123,
):
# Unreachable if we can't find the process with the PID
with mock.patch('subprocess.run') as patched_subprocess_run:
patched_subprocess_run.return_value.returncode = 1
self.assertTrue(is_session_daemon_unreachable())
# Unreachable if we can find the process with the PID, but it is not a session daemon
with mock.patch('subprocess.run') as patched_subprocess_run:
patched_subprocess_run.return_value.returncode = 0
patched_subprocess_run.return_value.stdout = 'some-random-command\n'
self.assertTrue(is_session_daemon_unreachable())
# All good if we can find the process with the PID and it is a session daemon
with mock.patch('subprocess.run') as patched_subprocess_run:
patched_subprocess_run.return_value.returncode = 0
patched_subprocess_run.return_value.stdout = 'lttng-sessiond\n'
self.assertFalse(is_session_daemon_unreachable())
def test_unreachable_session_daemon(self):
from tracetools_trace.tools.lttng_impl import setup
with (
mock.patch(
'tracetools_trace.tools.lttng_impl.is_session_daemon_not_alive',
return_value=False,
),
mock.patch(
'tracetools_trace.tools.lttng_impl.is_session_daemon_unreachable',
return_value=True,
),
):
with self.assertRaises(RuntimeError):
setup(session_name='test-session', base_path='/tmp')

View file

@ -0,0 +1,38 @@
# Copyright 2021 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from tracetools_trace.tools import names
from tracetools_trace.tools import tracepoints
class TestNames(unittest.TestCase):
def __init__(self, *args) -> None:
super().__init__(
*args,
)
def test_tracepoint_names(self) -> None:
# Make sure the list of default ROS events contains exactly all the
# tracepoints defined as constants, otherwise something might have been forgotten
tp_constant_names = {name for name in dir(tracepoints) if not name.startswith('__')}
tp_names = {getattr(tracepoints, name) for name in tp_constant_names}
self.assertTrue(all(name.startswith('ros2:') for name in tp_names), tp_names)
self.assertSetEqual(set(names.DEFAULT_EVENTS_ROS), tp_names)
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,71 @@
# Copyright 2020 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the trace directory logic."""
import os
import pathlib
from tracetools_trace.tools.path import get_tracing_directory
def test_get_trace_directory():
os.environ.pop('ROS_TRACE_DIR', None)
os.environ.pop('ROS_HOME', None)
home = pathlib.Path.home()
assert str(home)
# Default case without ROS_TRACE_DIR or ROS_HOME being set (but with HOME)
default_dir = str(home / '.ros/tracing')
assert get_tracing_directory() == default_dir
# Use $ROS_TRACE_DIR if it is set
my_trace_dir_raw = '/my/ROS_TRACE_DIR'
my_trace_dir = str(pathlib.Path(my_trace_dir_raw))
os.environ['ROS_TRACE_DIR'] = my_trace_dir
assert get_tracing_directory() == my_trace_dir
# Make sure it converts path separators when necessary
os.environ['ROS_TRACE_DIR'] = my_trace_dir_raw
assert get_tracing_directory() == my_trace_dir
# Setting ROS_HOME won't change anything since ROS_TRACE_DIR is used first
os.environ['ROS_HOME'] = '/this/wont/be/used'
assert get_tracing_directory() == my_trace_dir
os.environ.pop('ROS_HOME', None)
# Empty is considered unset
os.environ['ROS_TRACE_DIR'] = ''
assert get_tracing_directory() == default_dir
# Make sure '~' is expanded to the home directory
os.environ['ROS_TRACE_DIR'] = '~/tracedir'
assert get_tracing_directory() == str(home / 'tracedir')
os.environ.pop('ROS_TRACE_DIR', None)
# Without ROS_TRACE_DIR, use $ROS_HOME/tracing
fake_ros_home = home / '.fakeroshome'
fake_ros_home_trace_dir = str(fake_ros_home / 'tracing')
os.environ['ROS_HOME'] = str(fake_ros_home)
assert get_tracing_directory() == fake_ros_home_trace_dir
# Make sure it converts path separators when necessary
my_ros_home_raw = '/my/ros/home'
my_ros_home_trace_dir = str(pathlib.Path(my_ros_home_raw) / 'tracing')
os.environ['ROS_HOME'] = my_ros_home_raw
assert get_tracing_directory() == my_ros_home_trace_dir
# Empty is considered unset
os.environ['ROS_HOME'] = ''
assert get_tracing_directory() == default_dir
# Make sure '~' is expanded to the home directory
os.environ['ROS_HOME'] = '~/.fakeroshome'
assert get_tracing_directory() == fake_ros_home_trace_dir
os.environ.pop('ROS_HOME', None)

View file

@ -0,0 +1 @@

View file

@ -20,54 +20,87 @@ from . import names
from . import path
class DefaultArgValueCompleter:
class ArgCompleter:
"""Callable return given value."""
def __init__(self, value):
self.value = value
def __call__(self, **kwargs):
return self.value
class DefaultArgValueCompleter(ArgCompleter):
"""Callable returning an arg's default value."""
def __init__(self, arg):
default = arg.default
self.list = default if isinstance(default, list) else [default]
def __call__(self, **kwargs):
return self.list
super().__init__(arg.default if isinstance(arg.default, list) else [arg.default])
def parse_args() -> argparse.Namespace:
"""Parse args for tracing."""
parser = argparse.ArgumentParser(description='Setup and launch an LTTng tracing session.')
"""Parse arguments for interactive tracing session configuration."""
parser = argparse.ArgumentParser(
description='Trace ROS 2 nodes to get information on their execution')
add_arguments(parser)
return parser.parse_args()
def add_arguments(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
'-s', '--session-name', dest='session_name',
default=path.append_timestamp('session'),
help='the name of the tracing session (default: session-YYYYMMDDHHMMSS)')
def _add_arguments_configure(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
'-p', '--path', dest='path',
default=path.DEFAULT_BASE_PATH,
help='path of the base directory for trace data (default: %(default)s)')
events_ust_arg = parser.add_argument( # type: ignore
help='path of the base directory for trace data (default: '
'$ROS_TRACE_DIR if ROS_TRACE_DIR is set and not empty, or '
'$ROS_HOME/tracing, using ~/.ros for ROS_HOME if not set or if empty)')
events_ust_arg = parser.add_argument(
'-u', '--ust', nargs='*', dest='events_ust', metavar='EVENT',
default=names.DEFAULT_EVENTS_UST,
help='the userspace events to enable (default: see tracetools_trace.tools.names) '
'[to disable all UST events, '
'provide this flag without any event name]')
events_ust_arg.completer = DefaultArgValueCompleter(events_ust_arg) # type: ignore
events_kernel_arg = parser.add_argument( # type: ignore
events_kernel_arg = parser.add_argument(
'-k', '--kernel', nargs='*', dest='events_kernel', metavar='EVENT',
default=names.DEFAULT_EVENTS_KERNEL,
help='the kernel events to enable (default: see tracetools_trace.tools.names) '
'[to disable all kernel events, '
'provide this flag without any event name]')
events_kernel_arg.completer = DefaultArgValueCompleter(events_kernel_arg) # type: ignore
context_arg = parser.add_argument( # type: ignore
'-c', '--context', nargs='*', dest='context_names', metavar='CONTEXT',
default=[],
help='the kernel events to enable (default: no kernel events)')
events_kernel_arg.completer = ArgCompleter(names.EVENTS_KERNEL) # type: ignore
parser.add_argument(
'--syscall', nargs='*', dest='syscalls', metavar='SYSCALL',
default=[],
help='the syscalls to enable (default: no syscalls)')
context_arg = parser.add_argument(
'-c', '--context', nargs='*', dest='context_fields', metavar='CONTEXT',
default=names.DEFAULT_CONTEXT,
help='the context names to enable (default: see tracetools_trace.tools.names) '
'[to disable all context names, '
'provide this flag without any name]')
help='the context fields to enable (default: see tracetools_trace.tools.names) '
'[to disable all context fields, '
'provide this flag without any field names]')
context_arg.completer = DefaultArgValueCompleter(context_arg) # type: ignore
parser.add_argument(
'-l', '--list', dest='list', action='store_true',
help='display lists of enabled events and context names (default: %(default)s)')
parser.add_argument(
'-a', '--append-trace', dest='append_trace', action='store_true',
help='append to trace if it already exists, otherwise error out (default: %(default)s)')
def _add_arguments_default_session_name(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
'-s', '--session-name', dest='session_name',
default=path.append_timestamp('session'),
help='the name of the tracing session (default: session-YYYYMMDDHHMMSS)')
def add_arguments(parser: argparse.ArgumentParser) -> None:
"""Add arguments to parser for interactive tracing session configuration."""
_add_arguments_default_session_name(parser)
_add_arguments_configure(parser)
def add_arguments_noninteractive(parser: argparse.ArgumentParser) -> None:
"""Add arguments to parser for non-interactive tracing session configuration."""
add_arguments_session_name(parser)
_add_arguments_configure(parser)
def add_arguments_session_name(parser: argparse.ArgumentParser) -> None:
"""Add mandatory session name argument to parser."""
parser.add_argument('session_name', help='the name of the tracing session')

View file

@ -1,4 +1,5 @@
# Copyright 2019 Robert Bosch GmbH
# Copyright 2021 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,104 +16,125 @@
"""Interface for tracing with LTTng."""
import platform
import subprocess
import sys
from typing import List
from typing import Optional
from packaging.version import Version
try:
from . import lttng_impl
_lttng = lttng_impl # type: ignore
# Check lttng module version
from distutils.version import StrictVersion
current_version = _lttng.get_version()
LTTNG_MIN_VERSION = '2.10.7'
if current_version is None or current_version < StrictVersion(LTTNG_MIN_VERSION):
print(
f'lttng module version >={LTTNG_MIN_VERSION} required, found {str(current_version)}',
file=sys.stderr,
)
from . import lttng_impl as _lttng
except ImportError:
# Fall back on stub functions so that this still passes linter checks
from . import lttng_stub
_lttng = lttng_stub # type: ignore
from .names import DEFAULT_CONTEXT
from .names import DEFAULT_EVENTS_KERNEL
from .names import DEFAULT_EVENTS_ROS
from .path import DEFAULT_BASE_PATH
# This will happen if lttngpy isn't found, in which case importing lttng_impl will fail
from . import lttng_stub as _lttng # type: ignore
def lttng_init(
session_name: str,
base_path: str = DEFAULT_BASE_PATH,
ros_events: List[str] = DEFAULT_EVENTS_ROS,
kernel_events: List[str] = DEFAULT_EVENTS_KERNEL,
context_names: List[str] = DEFAULT_CONTEXT,
) -> Optional[str]:
def get_lttng_version() -> Optional[Version]:
"""
Get version of lttng-ctl.
:return: the version of lttng-ctl, or `None` if it is not available
"""
if not hasattr(_lttng, 'get_version') or not callable(_lttng.get_version):
return None
return _lttng.get_version()
def lttng_init(**kwargs) -> Optional[str]:
"""
Set up and start LTTng session.
:param session_name: the name of the session
:param base_path: the path to the directory in which to create the tracing session directory
:param ros_events: list of ROS events to enable
:param kernel_events: list of kernel events to enable
:param context_names: list of context elements to enable
:return: the full path to the trace directory
For the full list of kwargs, see `lttng_impl.setup()`.
Raises RuntimeError on failure, in which case the tracing session might still exist.
:return: the full path to the trace directory, or `None` if initialization failed
"""
trace_directory = _lttng.setup(
session_name,
base_path,
ros_events,
kernel_events,
context_names,
)
_lttng.start(session_name)
if not is_lttng_installed():
return None
trace_directory = _lttng.setup(**kwargs)
if trace_directory is None:
return None
_lttng.start(**kwargs)
return trace_directory
def lttng_fini(
session_name: str,
) -> None:
def lttng_fini(**kwargs) -> None:
"""
Stop and destroy LTTng session.
Raises RuntimeError on failure.
:param session_name: the name of the session
"""
_lttng.stop(session_name)
_lttng.destroy(session_name)
_lttng.stop(**kwargs)
_lttng.destroy(**kwargs)
def is_lttng_installed() -> bool:
def lttng_start(**kwargs) -> None:
"""
Start tracing.
Raises RuntimeError on failure.
:param session_name: the name of the session
"""
_lttng.start(**kwargs)
def lttng_stop(**kwargs) -> None:
"""
Stop tracing.
Raises RuntimeError on failure.
:param session_name: the name of the session
"""
_lttng.stop(**kwargs)
def is_lttng_installed(
*,
minimum_version: Optional[str] = None,
) -> bool:
"""
Check if LTTng is installed.
It first checks if the OS can support LTTng.
If so, it then simply checks if LTTng is installed using the 'lttng' command.
If so, it then checks if lttng-ctl is installed.
:return: True if it is installed, False otherwise
Optionally, a minimum version can also be specified for lttng-ctl.
:param minimum_version: the minimum required lttng-ctl version
:return: True if lttng-ctl is installed, and optionally if the version of lttng-ctl is
sufficient, False otherwise
"""
# Check system
message_doc = (
'Cannot trace. See documentation at: '
'https://gitlab.com/ros-tracing/ros2_tracing'
'https://github.com/ros2/ros2_tracing'
)
system = platform.system()
if 'Linux' != system:
print(f"System '{system}' does not support LTTng.\n{message_doc}", file=sys.stderr)
return False
try:
process = subprocess.Popen(
['lttng', '--version'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
# Check if lttng-ctl is installed
lttng_version = get_lttng_version()
if not lttng_version:
print(
f'lttng-ctl (liblttng-ctl-dev) not installed\n{message_doc}',
file=sys.stderr,
)
_, stderr = process.communicate()
if 0 != process.returncode:
raise RuntimeError(stderr.decode())
return True
except (RuntimeError, FileNotFoundError) as e:
print(f'LTTng not found: {e}\n{message_doc}', file=sys.stderr)
return False
# Check if lttng-ctl version is sufficient
if minimum_version and lttng_version < Version(minimum_version):
print(
(
f'lttng-ctl (liblttng-ctl-dev) version >={minimum_version} required, '
f'found {str(lttng_version)}'
),
file=sys.stderr,
)
return False
return True

View file

@ -1,4 +1,5 @@
# Copyright 2019 Robert Bosch GmbH
# Copyright 2021 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -14,72 +15,216 @@
"""Implementation of the interface for tracing with LTTng."""
from distutils.version import StrictVersion
import re
import os
import shlex
import subprocess
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Union
import lttng
from lttngpy import impl as lttngpy
from packaging.version import Version
from .names import CONTEXT_TYPE_CONSTANTS_MAP
from .names import DEFAULT_CONTEXT
from .names import DEFAULT_EVENTS_KERNEL
from .names import DEFAULT_EVENTS_ROS
from .path import DEFAULT_BASE_PATH
from .path import get_full_session_path
from .names import DOMAIN_TYPE_KERNEL
from .names import DOMAIN_TYPE_USERSPACE
def get_version() -> Union[StrictVersion, None]:
def get_version() -> Optional[Version]:
"""
Get the version of the lttng module.
Get version of lttng-ctl.
The module does not have a __version__ attribute, but the version is mentioned in its __doc__,
and seems to be written in a consistent way across versions.
:return: the version as a StrictVersion object, or `None` if it cannot be extracted
:return: the version as a Version object, or `None` if it cannot be extracted
"""
doc_lines = lttng.__doc__.split('\n')
filtered_doc_lines: List[str] = list(filter(None, doc_lines))
if len(filtered_doc_lines) == 0:
if not lttngpy.is_available():
return None
first_line = filtered_doc_lines[0]
version_string = first_line.split(' ')[1]
if not re.compile(r'^[0-9]+\.[0-9]+\.[0-9]+$').match(version_string):
return Version(lttngpy.LTTNG_CTL_VERSION)
def is_kernel_tracer_available() -> bool:
"""
Check if the kernel tracer is available.
This must not be called if `lttngpy.is_available()` is `False`.
:return: `True` if available or `False` if not
"""
return not isinstance(lttngpy.get_tracepoints(domain_type=lttngpy.LTTNG_DOMAIN_KERNEL), int)
def get_lttng_home() -> Optional[str]:
"""
Get the LTTng home value.
$LTTNG_HOME, or $HOME if unset: https://lttng.org/man/1/lttng/v2.13/#doc-_files
:return: the LTTng home value
"""
return os.environ.get('LTTNG_HOME') or os.environ.get('HOME')
def get_session_daemon_pid() -> Optional[int]:
"""
Get the non-root session daemon PID, if there is one.
This does not apply to root session daemons.
:return: the non-root session daemon PID, or `None` if there is none
"""
# When a non-root session daemon is started, its PID is written to .lttng/lttng-sessiond.pid
# under the LTTng home
lttng_home = get_lttng_home()
# If we can't find the home, then we can just assume that there is no session daemon
if not lttng_home:
return None
return StrictVersion(version_string)
# If the file doesn't exist, there is no session daemon
lttng_sessiond_pid = os.path.join(lttng_home, '.lttng', 'lttng-sessiond.pid')
if not os.path.isfile(lttng_sessiond_pid):
return None
with open(lttng_sessiond_pid, 'r') as f:
pid = f.read().strip()
if not pid.isdigit():
return None
return int(pid)
def is_session_daemon_unreachable() -> bool:
"""
Check if the session daemon appears to exist while being unreachable.
This tries to detect cases of this LTTng issue: https://bugs.lttng.org/issues/1371
If this issue happens, LTTng will think that the session daemon exists and will happily trace,
but it will silently not record any trace data, since there is no actual session daemon.
Therefore, if this returns `True`, then tracing will silently not work.
TODO(christophebedard) remove this once Rolling uses a version of LTTng with a fix for this bug
:return: `True` if the session daemon is unreachable, `False` otherwise
"""
pid = get_session_daemon_pid()
# If we can't find the PID, then the session daemon really doesn't exist and we can just create
# one, so it's fine
if pid is None:
return False
# Otherwise, try to look up the process with that PID; if we can't find it, or if it is not
# lttng-sessiond, then it means that the session daemon is unreachable
process = subprocess.run(
shlex.split(f'ps -o comm= {pid}'),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding='utf-8',
)
return 1 == process.returncode or 'lttng-sessiond' != process.stdout.strip()
def is_session_daemon_not_alive() -> bool:
"""
Check if the session daemon isn't alive.
This must not be called if `lttngpy.is_available()` is `False`.
:return: `True` if the session daemon is not alive, or `False` if it is alive
"""
return not lttngpy.is_lttng_session_daemon_alive()
def spawn_session_daemon() -> None:
"""
Try to spawn a session daemon.
Raises RuntimeError if lttng-sessiond is not found.
"""
try:
subprocess.run(['lttng-sessiond', '--daemonize'])
except FileNotFoundError:
raise RuntimeError(
'cannot find lttng-sessiond: on Ubuntu, install lttng-tools and liblttng-ust-dev')
def setup(
*,
session_name: str,
base_path: str = DEFAULT_BASE_PATH,
base_path: str,
append_trace: bool = False,
ros_events: Union[List[str], Set[str]] = DEFAULT_EVENTS_ROS,
kernel_events: Union[List[str], Set[str]] = DEFAULT_EVENTS_KERNEL,
context_names: Union[List[str], Set[str]] = DEFAULT_CONTEXT,
kernel_events: Union[List[str], Set[str]] = [],
syscalls: Union[List[str], Set[str]] = [],
context_fields: Union[List[str], Set[str], Dict[str, List[str]]] = DEFAULT_CONTEXT,
channel_name_ust: str = 'ros2',
channel_name_kernel: str = 'kchan',
subbuffer_size_ust: int = 8 * 4096,
subbuffer_size_kernel: int = 32 * 4096,
) -> Optional[str]:
"""
Set up LTTng session, with events and context.
See: https://lttng.org/docs/#doc-core-concepts
Initialization will fail if the list of kernel events to be
enabled is not empty and if the kernel tracer is not installed.
This must not be called if `lttngpy.is_available()` is `False`.
Raises RuntimeError on failure, in which case the tracing session might still exist.
:param session_name: the name of the session
:param base_path: the path to the directory in which to create the tracing session directory
:param base_path: the path to the directory in which to create the tracing session directory,
which will be created if needed
:param append_trace: whether to append to the trace directory if it already exists, otherwise
an error is reported
:param ros_events: list of ROS events to enable
:param kernel_events: list of kernel events to enable
:param context_names: list of context elements to enable
:param syscalls: list of syscalls to enable
these will be part of the kernel channel
:param context_fields: the names of context fields to enable
if it's a list or a set, the context fields are enabled for both kernel and userspace;
if it's a dictionary: { domain type string -> context fields list }
with the domain type string being either `names.DOMAIN_TYPE_KERNEL` or
`names.DOMAIN_TYPE_USERSPACE`
:param channel_name_ust: the UST channel name
:param channel_name_kernel: the kernel channel name
:return: the full path to the trace directory
:param subbuffer_size_ust: the size of the subbuffers for userspace events (defaults to 8 times
the usual page size)
:param subbuffer_size_kernel: the size of the subbuffers for kernel events (defaults to 32
times the usual page size, since there can be way more kernel events than UST events)
:return: the full path to the trace directory, or `None` if initialization failed
"""
# Check if there is a session daemon running
if lttng.session_daemon_alive() == 0:
# Otherwise spawn one without doing any error checks
subprocess.run(
['lttng-sessiond', '--daemonize'],
# Validate parameters
if not session_name:
raise RuntimeError('empty session name')
# Resolve full tracing directory path
full_path = os.path.join(base_path, session_name)
if os.path.isdir(full_path) and not append_trace:
raise RuntimeError(
f'trace directory already exists, use the append option to append to it: {full_path}')
# If there is no session daemon running, try to spawn one
if is_session_daemon_not_alive():
spawn_session_daemon()
# Error out if it looks like there is a session daemon that we can't actually reach
if is_session_daemon_unreachable():
raise RuntimeError(
'lttng-sessiond seems to exist, but is unreachable. '
'If using two containers with the same HOME directory, set the LTTNG_HOME environment '
'variable to the path to a unique directory for each container and make sure that the '
'directory exists. See: https://bugs.lttng.org/issues/1371'
)
# Error out if there is still no session daemon
if is_session_daemon_not_alive():
raise RuntimeError('failed to start lttng session daemon')
# Make sure the kernel tracer is available if there are kernel events, including syscalls
# Do this after spawning a session daemon, otherwise we can't detect the kernel tracer
if 0 < (len(kernel_events) + len(syscalls)) and not is_kernel_tracer_available():
raise RuntimeError(
'kernel tracer is not available:\n'
' cannot use kernel events or syscalls:\n'
" 'ros2 trace' command: cannot use '-k' or '--syscall' options\n"
" 'Trace' action: cannot set 'events_kernel'/'events-kernel' or 'syscalls' lists\n"
' install the kernel tracer, e.g., on Ubuntu, install lttng-modules-dkms\n'
' see: https://github.com/ros2/ros2_tracing#building'
)
# Convert lists to sets
@ -87,261 +232,280 @@ def setup(
ros_events = set(ros_events)
if not isinstance(kernel_events, set):
kernel_events = set(kernel_events)
if not isinstance(context_names, set):
context_names = set(context_names)
if not isinstance(syscalls, set):
syscalls = set(syscalls)
if isinstance(context_fields, list):
context_fields = set(context_fields)
# Resolve full tracing directory path
full_path = get_full_session_path(session_name, base_path=base_path)
ust_enabled = bool(ros_events)
kernel_enabled = bool(kernel_events) or bool(syscalls)
if not (ust_enabled or kernel_enabled):
raise RuntimeError('no events enabled')
ust_enabled = ros_events is not None and len(ros_events) > 0
kernel_enabled = kernel_events is not None and len(kernel_events) > 0
# Create session
# LTTng will create the parent directories if needed
_create_session(
session_name=session_name,
full_path=full_path,
)
# Domains
# Enable channel, events, and contexts for each domain
contexts_dict = _normalize_contexts_dict(context_fields)
if ust_enabled:
domain_ust = lttng.Domain()
domain_ust.type = lttng.DOMAIN_UST
# Per-user buffer
domain_ust.buf_type = lttng.BUFFER_PER_UID
channel_ust = lttng.Channel()
channel_ust.name = channel_name_ust
# Discard, do not overwrite
channel_ust.attr.overwrite = 0
# 8 sub-buffers of 2 times the usual page size
channel_ust.attr.subbuf_size = 2 * 4096
channel_ust.attr.num_subbuf = 8
# Ignore switch timer interval and use read timer instead
channel_ust.attr.switch_timer_interval = 0
channel_ust.attr.read_timer_interval = 200
# mmap channel output instead of splice
channel_ust.attr.output = lttng.EVENT_MMAP
events_list_ust = _create_events(ros_events)
domain = DOMAIN_TYPE_USERSPACE
domain_type = lttngpy.LTTNG_DOMAIN_UST
channel_name = channel_name_ust
_enable_channel(
session_name=session_name,
domain_type=domain_type,
# Per-user buffer
buffer_type=lttngpy.LTTNG_BUFFER_PER_UID,
channel_name=channel_name,
# Discard, do not overwrite
overwrite=0,
# We use 2 sub-buffers because the number of sub-buffers is pointless in discard mode,
# and switching between sub-buffers introduces noticeable CPU overhead
subbuf_size=subbuffer_size_ust,
num_subbuf=2,
# Ignore switch timer interval and use read timer instead
switch_timer_interval=0,
read_timer_interval=200,
# mmap channel output (only option for UST)
output=lttngpy.LTTNG_EVENT_MMAP,
)
_enable_events(
session_name=session_name,
domain_type=domain_type,
event_type=lttngpy.LTTNG_EVENT_TRACEPOINT,
channel_name=channel_name,
events=ros_events,
)
_add_contexts(
session_name=session_name,
domain_type=domain_type,
channel_name=channel_name,
context_fields=contexts_dict.get(domain),
)
if kernel_enabled:
domain_kernel = lttng.Domain()
domain_kernel.type = lttng.DOMAIN_KERNEL
# Global buffer (only option for kernel domain)
domain_kernel.buf_type = lttng.BUFFER_GLOBAL
channel_kernel = lttng.Channel()
channel_kernel.name = channel_name_kernel
# Discard, do not overwrite
channel_kernel.attr.overwrite = 0
# 8 sub-buffers of 8 times the usual page size, since
# there can be way more kernel events than UST events
channel_kernel.attr.subbuf_size = 8 * 4096
channel_kernel.attr.num_subbuf = 8
# Ignore switch timer interval and use read timer instead
channel_kernel.attr.switch_timer_interval = 0
channel_kernel.attr.read_timer_interval = 200
# mmap channel output instead of splice
channel_kernel.attr.output = lttng.EVENT_MMAP
events_list_kernel = _create_events(kernel_events)
# Session
_create_session(session_name, full_path)
# Handles, channels, events
handle_ust = None
if ust_enabled:
handle_ust = _create_handle(session_name, domain_ust)
_enable_channel(handle_ust, channel_ust)
_enable_events(handle_ust, events_list_ust, channel_ust.name)
handle_kernel = None
if kernel_enabled:
handle_kernel = _create_handle(session_name, domain_kernel)
_enable_channel(handle_kernel, channel_kernel)
_enable_events(handle_kernel, events_list_kernel, channel_kernel.name)
# Context
context_list = _create_context_list(context_names)
# TODO make it possible to add context in userspace and kernel separately, since some context
# types might only apply to userspace OR kernel; only consider userspace contexts for now
handles_context = [handle_ust]
enabled_handles: List[lttng.Handle] = list(filter(None, handles_context))
_add_context(enabled_handles, context_list)
domain = DOMAIN_TYPE_KERNEL
domain_type = lttngpy.LTTNG_DOMAIN_KERNEL
channel_name = channel_name_kernel
_enable_channel(
session_name=session_name,
domain_type=domain_type,
# Global buffer (only option for kernel domain)
buffer_type=lttngpy.LTTNG_BUFFER_GLOBAL,
channel_name=channel_name,
# Discard, do not overwrite
overwrite=0,
# We use 2 sub-buffers because the number of sub-buffers is pointless in discard mode,
# and switching between sub-buffers introduces noticeable CPU overhead
subbuf_size=subbuffer_size_kernel,
num_subbuf=2,
# Ignore switch timer interval and use read timer instead
switch_timer_interval=0,
read_timer_interval=200,
# mmap channel output instead of splice
output=lttngpy.LTTNG_EVENT_MMAP,
)
if kernel_events:
_enable_events(
session_name=session_name,
domain_type=domain_type,
event_type=lttngpy.LTTNG_EVENT_TRACEPOINT,
channel_name=channel_name,
events=kernel_events,
)
if syscalls:
_enable_events(
session_name=session_name,
domain_type=domain_type,
event_type=lttngpy.LTTNG_EVENT_SYSCALL,
channel_name=channel_name,
events=syscalls,
)
_add_contexts(
session_name=session_name,
domain_type=domain_type,
channel_name=channel_name,
context_fields=contexts_dict.get(domain),
)
return full_path
def start(
*,
session_name: str,
**kwargs,
) -> None:
"""
Start LTTng session, and check for errors.
This must not be called if `lttngpy.is_available()` is `False`.
Raises RuntimeError on failure to start.
:param session_name: the name of the session
"""
result = lttng.start(session_name)
result = lttngpy.lttng_start_tracing(session_name=session_name)
if result < 0:
raise RuntimeError(f'failed to start tracing: {lttng.strerror(result)}')
error = lttngpy.lttng_strerror(result)
raise RuntimeError(f"failed to start tracing session '{session_name}': {error}")
def stop(
*,
session_name: str,
ignore_error: bool = False,
**kwargs,
) -> None:
"""
Stop LTTng session, and check for errors.
This must not be called if `lttngpy.is_available()` is `False`.
Raises RuntimeError on failure to stop, unless ignored.
:param session_name: the name of the session
:param ignore_error: whether to ignore any error when stopping
"""
result = lttng.stop(session_name)
if result < 0:
raise RuntimeError(f'failed to stop tracing: {lttng.strerror(result)}')
result = lttngpy.lttng_stop_tracing(session_name=session_name)
if result < 0 and not ignore_error:
error = lttngpy.lttng_strerror(result)
raise RuntimeError(f"failed to stop tracing session '{session_name}': {error}")
def destroy(
*,
session_name: str,
ignore_error: bool = False,
**kwargs,
) -> None:
"""
Destroy LTTng session, and check for errors.
This must not be called if `lttngpy.is_available()` is `False`.
Raises RuntimeError on failure to destroy, unless ignored.
:param session_name: the name of the session
:param ignore_error: whether to ignore any error when destroying
"""
result = lttng.destroy(session_name)
if result < 0:
raise RuntimeError(f'failed to destroy tracing session: {lttng.strerror(result)}')
def _create_events(
event_names: Set[str],
) -> List[lttng.Event]:
"""
Create events list from names.
:param event_names: a set of names to create events for
:return: the list of events
"""
events_list = []
for event_name in event_names:
e = lttng.Event()
e.name = event_name
e.type = lttng.EVENT_TRACEPOINT
e.loglevel_type = lttng.EVENT_LOGLEVEL_ALL
events_list.append(e)
return events_list
result = lttngpy.lttng_destroy_session(session_name=session_name)
if result < 0 and not ignore_error:
error = lttngpy.lttng_strerror(result)
raise RuntimeError(f"failed to destroy tracing session '{session_name}': {error}")
def _create_session(
*,
session_name: str,
full_path: str,
) -> None:
"""
Create session from name and full directory path, and check for errors.
This must not be called if `lttngpy.is_available()` is `False`.
Raises RuntimeError on failure.
:param session_name: the name of the session
:param full_path: the full path to the main directory to write trace data to
"""
result = lttng.create(session_name, full_path)
LTTNG_ERR_EXIST_SESS = 28
if result == -LTTNG_ERR_EXIST_SESS:
# Sessions seem to persist, so if it already exists,
# just destroy it and try again
destroy(session_name)
result = lttng.create(session_name, full_path)
result = lttngpy.lttng_create_session(
session_name=session_name,
url=full_path,
)
if -lttngpy.LTTNG_ERR_EXIST_SESS.value == result:
# Sessions may persist if there was an error previously, so if it already exists, just
# destroy it and try again
destroy(session_name=session_name)
result = lttngpy.lttng_create_session(
session_name=session_name,
url=full_path,
)
if result < 0:
raise RuntimeError(f'session creation failed: {lttng.strerror(result)}')
error = lttngpy.lttng_strerror(result)
raise RuntimeError(f"failed to create tracing session '{session_name}': {error}")
def _create_handle(
session_name: str,
domain: lttng.Domain,
) -> lttng.Handle:
def _enable_channel(**kwargs) -> None:
"""
Create a handle for a given session name and a domain, and check for errors.
Enable channel, and check for errors.
:param session_name: the name of the session
:param domain: the domain to be used
:return: the handle
This must not be called if `lttngpy.is_available()` is `False`.
Raises RuntimeError on failure.
See `lttngpy.enable_channel` for kwargs.
"""
handle = None
handle = lttng.Handle(session_name, domain)
if handle is None:
raise RuntimeError('handle creation failed')
return handle
def _enable_channel(
handle: lttng.Handle,
channel: lttng.Channel,
) -> None:
"""
Enable channel for a handle, and check for errors.
:param handle: the handle to be used
:param channel: the channel to enable
"""
result = lttng.enable_channel(handle, channel)
result = lttngpy.enable_channel(**kwargs)
if result < 0:
raise RuntimeError(f'channel enabling failed: {lttng.strerror(result)}')
session_name = kwargs['session_name']
channel_name = kwargs['channel_name']
error = lttngpy.lttng_strerror(result)
raise RuntimeError(
f"failed to enable channel '{channel_name}' "
f"in tracing session '{session_name}': {error}"
)
def _enable_events(
handle: lttng.Handle,
events_list: List[lttng.Event],
channel_name: str,
) -> None:
def _enable_events(**kwargs) -> None:
"""
Enable events list for a given handle and channel name, and check for errors.
Enable events for a given channel name, and check for errors.
:param handle: the handle to be used
:param events_list: the list of events to enable
:param channel_name: the name of the channel to associate
This must not be called if `lttngpy.is_available()` is `False`.
Raises RuntimeError on failure.
See `lttngpy.enable_events` for kwargs.
"""
for event in events_list:
result = lttng.enable_event(handle, event, channel_name)
if result < 0:
raise RuntimeError(f'event enabling failed: {lttng.strerror(result)}')
result = lttngpy.enable_events(**kwargs)
if result < 0:
session_name = kwargs['session_name']
channel_name = kwargs['channel_name']
events = kwargs['events']
error = lttngpy.lttng_strerror(result)
raise RuntimeError(
f"failed to enable event(s) {events} for channel '{channel_name}' "
f"in tracing session '{session_name}': {error}"
)
context_map = {
name: getattr(lttng, name_constant, None) if name_constant is not None else None
for name, name_constant in CONTEXT_TYPE_CONSTANTS_MAP.items()
}
def _context_name_to_type(
context_name: str,
) -> Union[int, None]:
def _normalize_contexts_dict(
context_fields: Union[Set[str], Dict[str, List[str]]],
) -> Dict[str, Set[str]]:
"""
Convert from context name to LTTng enum/constant type.
Normalize context set/dict to dict.
:param context_name: the generic name for the context
:return: the associated type, or `None` if it cannot be found
:param context_fields: the names of context fields to enable
if it's a set, the context fields are enabled for both kernel and userspace;
if it's a dictionary: { domain type string -> context fields list }
with the domain type string being either `names.DOMAIN_TYPE_KERNEL` or
`names.DOMAIN_TYPE_USERSPACE`
:return: a dictionary of domain type name to list of context field names
"""
return context_map.get(context_name, None)
DOMAIN_TYPES = {DOMAIN_TYPE_USERSPACE, DOMAIN_TYPE_KERNEL}
if isinstance(context_fields, dict):
unknown_domain_types = context_fields.keys() - DOMAIN_TYPES
if unknown_domain_types:
raise RuntimeError(f'unknown context domain type(s): {unknown_domain_types}')
return {domain: set(field_names) for domain, field_names in context_fields.items()}
assert isinstance(context_fields, set)
return {domain_type: context_fields for domain_type in DOMAIN_TYPES}
def _create_context_list(
context_names: Set[str],
) -> List[lttng.EventContext]:
def _add_contexts(**kwargs) -> None:
"""
Create context list from names, and check for errors.
Add context lists to given channel, and check for errors.
:param context_names: the set of context names
:return: the event context list
This must not be called if `lttngpy.is_available()` is `False`.
Raises RuntimeError on failure.
See `lttngpy.add_contexts` for kwargs.
"""
context_list = []
for context_name in context_names:
ec = lttng.EventContext()
context_type = _context_name_to_type(context_name)
if context_type is not None:
ec.ctx = context_type
context_list.append(ec)
else:
raise RuntimeError(f'failed to find context type: {context_name}')
return context_list
def _add_context(
handles: List[lttng.Handle],
context_list: List[lttng.EventContext],
) -> None:
"""
Add context list to given handles, and check for errors.
:param handles: the list of handles for which to add context
:param context_list: the list of event contexts to add to the handles
"""
for handle in handles:
for contex in context_list:
result = lttng.add_context(handle, contex, None, None)
if result < 0:
raise RuntimeError(f'failed to add context: {lttng.strerror(result)}')
result = lttngpy.add_contexts(**kwargs)
if result < 0:
session_name = kwargs['session_name']
channel_name = kwargs['channel_name']
domain_type = kwargs['domain_type']
error = lttngpy.lttng_strerror(result)
raise RuntimeError(
f"failed to add context fields for channel '{channel_name}' "
f"and domain '{domain_type}' in tracing session '{session_name}': {error}"
)

View file

@ -15,7 +15,7 @@
"""Stub version of the interface for tracing with LTTng."""
ERROR_MESSAGE = 'lttng module not found, but still tried to use it'
ERROR_MESSAGE = 'LTTng Python bindings not available, but still tried to use them'
def setup(*args, **kwargs) -> None:

View file

@ -1,4 +1,5 @@
# Copyright 2019 Robert Bosch GmbH
# Copyright 2021 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -14,6 +15,8 @@
"""Lists of names (events, context) for tracing."""
from . import tracepoints
EVENTS_KERNEL = [
'block_rq_complete',
'block_rq_insert',
@ -59,50 +62,54 @@ DEFAULT_EVENTS_KERNEL = [
]
DEFAULT_EVENTS_ROS = [
'ros2:rcl_init',
'ros2:rcl_node_init',
'ros2:rcl_publisher_init',
'ros2:rcl_publish',
'ros2:rclcpp_publish',
'ros2:rcl_subscription_init',
'ros2:rclcpp_subscription_init',
'ros2:rclcpp_subscription_callback_added',
'ros2:rcl_service_init',
'ros2:rclcpp_service_callback_added',
'ros2:rcl_client_init',
'ros2:rcl_timer_init',
'ros2:rclcpp_timer_callback_added',
'ros2:rclcpp_callback_register',
'ros2:callback_start',
'ros2:callback_end',
tracepoints.rcl_init,
tracepoints.rcl_node_init,
tracepoints.rmw_publisher_init,
tracepoints.rcl_publisher_init,
tracepoints.rclcpp_publish,
tracepoints.rclcpp_intra_publish,
tracepoints.rcl_publish,
tracepoints.rmw_publish,
tracepoints.rmw_subscription_init,
tracepoints.rcl_subscription_init,
tracepoints.rclcpp_subscription_init,
tracepoints.rclcpp_subscription_callback_added,
tracepoints.rmw_take,
tracepoints.rcl_take,
tracepoints.rclcpp_take,
tracepoints.rcl_service_init,
tracepoints.rclcpp_service_callback_added,
tracepoints.rmw_take_request,
tracepoints.rmw_send_response,
tracepoints.rmw_client_init,
tracepoints.rcl_client_init,
tracepoints.rmw_send_request,
tracepoints.rmw_take_response,
tracepoints.rcl_timer_init,
tracepoints.rclcpp_timer_callback_added,
tracepoints.rclcpp_timer_link_node,
tracepoints.rclcpp_callback_register,
tracepoints.callback_start,
tracepoints.callback_end,
tracepoints.rcl_lifecycle_state_machine_init,
tracepoints.rcl_lifecycle_transition,
tracepoints.rclcpp_executor_get_next_ready,
tracepoints.rclcpp_executor_wait_for_work,
tracepoints.rclcpp_executor_execute,
tracepoints.rclcpp_ipb_to_subscription,
tracepoints.rclcpp_buffer_to_ipb,
tracepoints.rclcpp_construct_ring_buffer,
tracepoints.rclcpp_ring_buffer_enqueue,
tracepoints.rclcpp_ring_buffer_dequeue,
tracepoints.rclcpp_ring_buffer_clear
]
DEFAULT_EVENTS_UST = DEFAULT_EVENTS_ROS
CONTEXT_TYPE_CONSTANTS_MAP = {
'pid': 'EVENT_CONTEXT_PID',
'procname': 'EVENT_CONTEXT_PROCNAME',
'prio': 'EVENT_CONTEXT_PRIO',
'nice': 'EVENT_CONTEXT_NICE',
'vpid': 'EVENT_CONTEXT_VPID',
'tid': 'EVENT_CONTEXT_TID',
'vtid': 'EVENT_CONTEXT_VTID',
'ppid': 'EVENT_CONTEXT_PPID',
'vppid': 'EVENT_CONTEXT_VPPID',
'pthread_id': 'EVENT_CONTEXT_PTHREAD_ID',
'hostname': 'EVENT_CONTEXT_HOSTNAME',
'ip': 'EVENT_CONTEXT_IP',
'interruptible': 'EVENT_CONTEXT_INTERRUPTIBLE',
'preemptible': 'EVENT_CONTEXT_PREEMPTIBLE',
'need_reschedule': 'EVENT_CONTEXT_NEED_RESCHEDULE',
'migratable': 'EVENT_CONTEXT_MIGRATABLE',
'perf:thread:instructions': None,
'perf:thread:cycles': None,
'perf:thread:cpu-cycles': None,
}
CONTEXT = list(CONTEXT_TYPE_CONSTANTS_MAP.keys())
DOMAIN_TYPE_KERNEL = 'kernel'
DOMAIN_TYPE_USERSPACE = 'userspace'
# These apply to both kernel & userspace domains
DEFAULT_CONTEXT = [
'procname',
'vpid',

View file

@ -1,4 +1,5 @@
# Copyright 2019 Robert Bosch GmbH
# Copyright 2019, 2020 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -16,9 +17,6 @@ import os
import time
DEFAULT_BASE_PATH = '~/.ros/tracing/'
def append_timestamp(
session_name_base: str,
) -> str:
@ -31,17 +29,22 @@ def append_timestamp(
return session_name_base + '-' + time.strftime('%Y%m%d%H%M%S')
def get_full_session_path(
session_name: str,
base_path: str = DEFAULT_BASE_PATH
) -> str:
def get_tracing_directory() -> str:
"""
Get the full path to the trace directory of a given session.
Get tracing directory path.
:param session_name: the name of the tracing session
:param base_path: the path to the directory containing the trace directory
:return: the full path to the tracing session directory
Uses various environment variables to construct a tracing directory path.
Use $ROS_TRACE_DIR if ROS_TRACE_DIR is set and not empty.
Otherwise, use $ROS_HOME/tracing, using ~/.ros for ROS_HOME if not set or if empty.
It also expands '~' to the current user's home directory,
and normalizes the path, converting the path separator if necessary.
:return: the path to the tracing directory
"""
if base_path is None:
base_path = DEFAULT_BASE_PATH
return os.path.expanduser(os.path.join(base_path, session_name))
trace_dir = os.environ.get('ROS_TRACE_DIR')
if not trace_dir:
trace_dir = os.environ.get('ROS_HOME')
if not trace_dir:
trace_dir = os.path.join('~', '.ros')
trace_dir = os.path.join(trace_dir, 'tracing')
return os.path.normpath(os.path.expanduser(trace_dir))

View file

@ -0,0 +1,116 @@
# Copyright 2020 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Signal handling utilities."""
import signal
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
class SignalHandledException(RuntimeError):
"""Exception raised after a signal is handled."""
pass
class SignalHandlerUtil():
"""
Signal handler as a context manager.
Modified version of: https://stackoverflow.com/a/35798485/6476709
"""
def __init__(
self,
release_callback: Optional[Callable[[], None]] = None,
raise_after_signal: bool = False,
signals: List[int] = [signal.SIGINT],
) -> None:
"""
Create a SignalHandlerUtil object.
:param release_callback: the function to call on release, possibly after handling a signal
:param raise_after_signal: whether to raise a SignalHandledException after signal/callback
:param signals: the list of signals to handle
"""
self.release_callback = release_callback
self.raise_after_signal = raise_after_signal
self.signals = signals
self.original_handlers: Dict[int, Any] = {}
def __enter__(self) -> 'SignalHandlerUtil':
"""Enter context and setup signal handlers."""
self.interrupted = False
self.released = False
for sig in self.signals:
self.original_handlers[sig] = signal.getsignal(sig)
signal.signal(sig, self._handler)
return self
def _handler(self, signum, frame) -> None:
"""Handle signal and trigger release."""
self.interrupted = True
if signal.SIGINT == signum:
print()
self._release()
def __exit__(self, exc_type, exc_value, traceback) -> Optional[bool]:
"""Exit context and trigger release."""
self._release()
# Suppress this specific exception, since it is only meant to be a notification
if SignalHandledException == exc_type:
return True
return None
def _release(self) -> bool:
"""Release and restore signal handlers."""
if self.released:
return False
for sig in self.signals:
signal.signal(sig, self.original_handlers[sig])
self.released = True
if self.release_callback:
self.release_callback()
if self.interrupted and self.raise_after_signal:
raise SignalHandledException()
return True
def execute_and_handle_sigint(
run_function: Callable[[], None],
fini_function: Optional[Callable[[], None]] = None,
) -> None:
"""
Execute a task and handle SIGINT to always finalize cleanly.
The main task function is interrupted on SIGINT.
The finalization function (if provided) is always executed, either
after the main task function is done or after it is interrupted.
:param run_function: the task function, which may be interrupted
:param fini_function: the optional finalization/cleanup function
"""
with SignalHandlerUtil(release_callback=fini_function, raise_after_signal=True):
try:
run_function()
except SignalHandledException:
pass

View file

@ -0,0 +1,56 @@
# Copyright 2021 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tracepoint names."""
rcl_init = 'ros2:rcl_init'
rcl_node_init = 'ros2:rcl_node_init'
rmw_publisher_init = 'ros2:rmw_publisher_init'
rcl_publisher_init = 'ros2:rcl_publisher_init'
rclcpp_publish = 'ros2:rclcpp_publish'
rclcpp_intra_publish = 'ros2:rclcpp_intra_publish'
rcl_publish = 'ros2:rcl_publish'
rmw_publish = 'ros2:rmw_publish'
rmw_subscription_init = 'ros2:rmw_subscription_init'
rcl_subscription_init = 'ros2:rcl_subscription_init'
rclcpp_subscription_init = 'ros2:rclcpp_subscription_init'
rclcpp_subscription_callback_added = 'ros2:rclcpp_subscription_callback_added'
rmw_take = 'ros2:rmw_take'
rcl_take = 'ros2:rcl_take'
rclcpp_take = 'ros2:rclcpp_take'
rcl_service_init = 'ros2:rcl_service_init'
rclcpp_service_callback_added = 'ros2:rclcpp_service_callback_added'
rmw_take_request = 'ros2:rmw_take_request'
rmw_send_response = 'ros2:rmw_send_response'
rmw_client_init = 'ros2:rmw_client_init'
rcl_client_init = 'ros2:rcl_client_init'
rmw_send_request = 'ros2:rmw_send_request'
rmw_take_response = 'ros2:rmw_take_response'
rcl_timer_init = 'ros2:rcl_timer_init'
rclcpp_timer_callback_added = 'ros2:rclcpp_timer_callback_added'
rclcpp_timer_link_node = 'ros2:rclcpp_timer_link_node'
rclcpp_callback_register = 'ros2:rclcpp_callback_register'
callback_start = 'ros2:callback_start'
callback_end = 'ros2:callback_end'
rcl_lifecycle_state_machine_init = 'ros2:rcl_lifecycle_state_machine_init'
rcl_lifecycle_transition = 'ros2:rcl_lifecycle_transition'
rclcpp_executor_get_next_ready = 'ros2:rclcpp_executor_get_next_ready'
rclcpp_executor_wait_for_work = 'ros2:rclcpp_executor_wait_for_work'
rclcpp_executor_execute = 'ros2:rclcpp_executor_execute'
rclcpp_ipb_to_subscription = 'ros2:rclcpp_ipb_to_subscription'
rclcpp_buffer_to_ipb = 'ros2:rclcpp_buffer_to_ipb'
rclcpp_construct_ring_buffer = 'ros2:rclcpp_construct_ring_buffer'
rclcpp_ring_buffer_enqueue = 'ros2:rclcpp_ring_buffer_enqueue'
rclcpp_ring_buffer_dequeue = 'ros2:rclcpp_ring_buffer_dequeue'
rclcpp_ring_buffer_clear = 'ros2:rclcpp_ring_buffer_clear'

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python3
# Copyright 2019 Robert Bosch GmbH
# Copyright 2021 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,91 +16,301 @@
"""Entrypoint/script to setup and start an LTTng tracing session."""
import argparse
import os
import sys
from typing import Callable
from typing import List
from typing import Optional
from typing import Tuple
from tracetools_trace.tools import args
from tracetools_trace.tools import lttng
from tracetools_trace.tools import path
from tracetools_trace.tools import print_names_list
from tracetools_trace.tools import signals
def init(
session_name: str,
base_path: str,
ros_events: List[str],
kernel_events: List[str],
context_names: List[str],
display_list: bool = False,
) -> None:
"""
Init and start tracing.
:param session_name: the name of the session
:param base_path: the path to the directory in which to create the tracing session directory
:param ros_events: list of ROS events to enable
:param kernel_events: list of kernel events to enable
:param context_names: list of context names to enable
:param display_list: whether to display list(s) of enabled events and context names
"""
def _assert_lttng_installed() -> None:
if not lttng.is_lttng_installed():
sys.exit(2)
ust_enabled = len(ros_events) > 0
kernel_enabled = len(kernel_events) > 0
if ust_enabled:
print(f'UST tracing enabled ({len(ros_events)} events)')
def _display_info(
*,
ros_events: List[str],
kernel_events: List[str],
syscalls: List[str],
context_fields: List[str],
display_list: bool,
) -> None:
if ros_events:
event_str = 'events' if len(ros_events) > 1 else 'event'
print(f'userspace tracing enabled ({len(ros_events)} {event_str})')
if display_list:
print_names_list(ros_events)
else:
print('UST tracing disabled')
if kernel_enabled:
print(f'kernel tracing enabled ({len(kernel_events)} events)')
print('userspace tracing disabled')
if kernel_events:
event_str = 'events' if len(kernel_events) > 1 else 'event'
print(f'kernel tracing enabled ({len(kernel_events)} {event_str})')
if display_list:
print_names_list(kernel_events)
else:
print('kernel tracing disabled')
if len(context_names) > 0:
print(f'context ({len(context_names)} names)')
if syscalls:
syscall_str = 'syscalls' if len(syscalls) > 1 else 'syscall'
print(f'syscall tracing enabled ({len(syscalls)} {syscall_str})')
if display_list:
print_names_list(context_names)
print_names_list(syscalls)
else:
print('syscalls tracing disabled')
if len(context_fields) > 0:
field_str = 'fields' if len(context_fields) > 1 else 'field'
print(f'context ({len(context_fields)} {field_str})')
if display_list:
print_names_list(context_fields)
full_session_path = path.get_full_session_path(session_name, base_path)
def _resolve_session_path(
*,
session_name: str,
base_path: Optional[str],
) -> Tuple[str, str]:
if not base_path:
base_path = path.get_tracing_directory()
full_session_path = os.path.join(base_path, session_name)
print(f'writing tracing session to: {full_session_path}')
input('press enter to start...')
lttng.lttng_init(
session_name,
base_path=base_path,
return base_path, full_session_path
def init(
*,
session_name: str,
base_path: Optional[str],
append_trace: bool,
ros_events: List[str],
kernel_events: List[str],
syscalls: List[str],
context_fields: List[str],
display_list: bool,
interactive: bool,
) -> bool:
"""
Init and start tracing.
Can be interactive by requiring user interaction to start tracing. If non-interactive, tracing
starts right away.
Raises RuntimeError on failure, in which case the tracing session might still exist.
:param session_name: the name of the session
:param base_path: the path to the directory in which to create the tracing session directory,
or `None` for default
:param append_trace: whether to append to the trace directory if it already exists, otherwise
an error is reported
:param ros_events: list of ROS events to enable
:param kernel_events: list of kernel events to enable
:param syscalls: list of syscalls to enable
:param context_fields: list of context fields to enable
:param display_list: whether to display list(s) of enabled events and context names
:param interactive: whether to require user interaction to start tracing
:return: True if successful, False otherwise
"""
_display_info(
ros_events=ros_events,
kernel_events=kernel_events,
context_names=context_names,
syscalls=syscalls,
context_fields=context_fields,
display_list=display_list,
)
base_path, full_session_path = _resolve_session_path(
session_name=session_name,
base_path=base_path,
)
if interactive:
input('press enter to start...')
trace_directory = lttng.lttng_init(
session_name=session_name,
base_path=base_path,
append_trace=append_trace,
ros_events=ros_events,
kernel_events=kernel_events,
syscalls=syscalls,
context_fields=context_fields,
)
if trace_directory is None:
return False
# Simple sanity check
assert trace_directory == full_session_path
return True
def fini(
*,
session_name: str,
) -> None:
"""
Stop and finalize tracing.
Needs user interaction to stop tracing. Stops tracing automatically on SIGINT.
:param session_name: the name of the session
"""
input('press enter to stop...')
print('stopping & destroying tracing session')
lttng.lttng_fini(session_name)
def _run() -> None:
input('press enter to stop...')
def _fini() -> None:
print('stopping & destroying tracing session')
lttng.lttng_fini(session_name=session_name)
signals.execute_and_handle_sigint(_run, _fini)
def main():
params = args.parse_args()
def cleanup(
*,
session_name: str,
) -> None:
"""
Clean up and remove tracing session if it exists.
init(
params.session_name,
params.path,
params.events_ust,
params.events_kernel,
params.context_names,
params.list,
)
fini(
params.session_name,
)
:param session_name: the name of the session
"""
lttng.lttng_fini(session_name=session_name, ignore_error=True)
def _do_work_and_report_error(
work: Callable[[], int],
session_name: str,
*,
do_cleanup: bool,
) -> int:
"""
Perform some work, reporting any error and cleaning up.
This will call the work function and catch `RuntimeError`, in which case the error will be
printed, and the session will be cleaned up if needed.
:param work: the work function to be called which may raise `RuntimeError`
:param session_name: the session name
:param do_cleanup: whether to clean the session up on error
:return: the return code of the work function, or 1 if an error was reported
"""
_assert_lttng_installed()
try:
return work()
except RuntimeError as e:
print(f'error: {str(e)}', file=sys.stderr)
if do_cleanup:
cleanup(session_name=session_name)
return 1
def trace(args: argparse.Namespace) -> int:
"""
Trace.
Needs user interaction to start tracing and then stop tracing.
On failure, the tracing session will not exist.
:param args: the arguments parsed using `tracetools_trace.tools.args.add_arguments`
:return: the return code (0 if successful, 1 otherwise)
"""
def work() -> int:
if not init(
session_name=args.session_name,
base_path=args.path,
append_trace=args.append_trace,
ros_events=args.events_ust,
kernel_events=args.events_kernel,
syscalls=args.syscalls,
context_fields=args.context_fields,
display_list=args.list,
interactive=True,
):
return 1
fini(session_name=args.session_name)
return 0
return _do_work_and_report_error(work, args.session_name, do_cleanup=True)
def start(args: argparse.Namespace) -> int:
"""
Configure tracing session and start tracing.
On failure, the tracing session will not exist.
:param args: the arguments parsed using
`tracetools_trace.tools.args.add_arguments_noninteractive`
:return: the return code (0 if successful, 1 otherwise)
"""
def work() -> int:
return int(
not init(
session_name=args.session_name,
base_path=args.path,
append_trace=args.append_trace,
ros_events=args.events_ust,
kernel_events=args.events_kernel,
syscalls=args.syscalls,
context_fields=args.context_fields,
display_list=args.list,
interactive=False,
)
)
return _do_work_and_report_error(work, args.session_name, do_cleanup=True)
def stop(args: argparse.Namespace) -> int:
"""
Stop tracing.
On failure, the tracing session might still exist.
:param args: the arguments parsed using
`tracetools_trace.tools.args.add_arguments_session_name`
:return: the return code (0 if successful, 1 otherwise)
"""
def work() -> int:
lttng.lttng_fini(session_name=args.session_name)
return 0
return _do_work_and_report_error(work, args.session_name, do_cleanup=False)
def pause(args: argparse.Namespace) -> int:
"""
Pause tracing after starting or resuming.
On failure, the tracing session might still exist.
:param args: the arguments parsed using
`tracetools_trace.tools.args.add_arguments_session_name`
:return: the return code (0 if successful, 1 otherwise)
"""
def work() -> int:
lttng.lttng_stop(session_name=args.session_name)
return 0
return _do_work_and_report_error(work, args.session_name, do_cleanup=False)
def resume(args: argparse.Namespace) -> int:
"""
Resume tracing after pausing.
On failure, the tracing session might still exist.
:param args: the arguments parsed using
`tracetools_trace.tools.args.add_arguments_session_name`
:return: the return code (0 if successful, 1 otherwise)
"""
def work() -> int:
lttng.lttng_start(session_name=args.session_name)
return 0
return _do_work_and_report_error(work, args.session_name, do_cleanup=False)
def main() -> int:
return trace(args.parse_args())