Merge branch 'enhance-tests' into 'master'

enhance tests

Closes #3

See merge request micro-ROS/ros2_tracing!5
This commit is contained in:
Ingo Lütkebohle 2019-06-19 14:12:15 +00:00
commit 8ab2ac1a7d
17 changed files with 716 additions and 219 deletions

View file

@ -24,7 +24,7 @@ class PingNode : public rclcpp::Node
{ {
public: public:
explicit PingNode(rclcpp::NodeOptions options) explicit PingNode(rclcpp::NodeOptions options)
: Node("ping_node", options) : Node("test_ping", options)
{ {
sub_ = this->create_subscription<std_msgs::msg::String>( sub_ = this->create_subscription<std_msgs::msg::String>(
"pong", "pong",

View file

@ -21,7 +21,7 @@ class PongNode : public rclcpp::Node
{ {
public: public:
explicit PongNode(rclcpp::NodeOptions options) explicit PongNode(rclcpp::NodeOptions options)
: Node("pong_node", options) : Node("test_pong", options)
{ {
sub_ = this->create_subscription<std_msgs::msg::String>( sub_ = this->create_subscription<std_msgs::msg::String>(
"ping", "ping",

View file

@ -21,7 +21,7 @@ class PubNode : public rclcpp::Node
{ {
public: public:
explicit PubNode(rclcpp::NodeOptions options) explicit PubNode(rclcpp::NodeOptions options)
: Node("pub_node", options) : Node("test_publisher", options)
{ {
pub_ = this->create_publisher<std_msgs::msg::String>( pub_ = this->create_publisher<std_msgs::msg::String>(
"the_topic", "the_topic",

View file

@ -21,10 +21,10 @@ class ServiceNode : public rclcpp::Node
{ {
public: public:
explicit ServiceNode(rclcpp::NodeOptions options) explicit ServiceNode(rclcpp::NodeOptions options)
: Node("service_node", options) : Node("test_service", options)
{ {
srv_ = this->create_service<std_srvs::srv::Empty>( srv_ = this->create_service<std_srvs::srv::Empty>(
"service", "the_service",
std::bind( std::bind(
&ServiceNode::service_callback, &ServiceNode::service_callback,
this, this,

View file

@ -24,7 +24,7 @@ class PingNode : public rclcpp::Node
{ {
public: public:
explicit PingNode(rclcpp::NodeOptions options) explicit PingNode(rclcpp::NodeOptions options)
: Node("ping_node", options) : Node("test_service_ping", options)
{ {
srv_ = this->create_service<std_srvs::srv::Empty>( srv_ = this->create_service<std_srvs::srv::Empty>(
"pong", "pong",

View file

@ -21,7 +21,7 @@ class PongNode : public rclcpp::Node
{ {
public: public:
explicit PongNode(rclcpp::NodeOptions options) explicit PongNode(rclcpp::NodeOptions options)
: Node("pong_node", options) : Node("test_service_pong", options)
{ {
srv_ = this->create_service<std_srvs::srv::Empty>( srv_ = this->create_service<std_srvs::srv::Empty>(
"ping", "ping",

View file

@ -21,7 +21,7 @@ class SubNode : public rclcpp::Node
{ {
public: public:
explicit SubNode(rclcpp::NodeOptions options) explicit SubNode(rclcpp::NodeOptions options)
: Node("sub_node", options) : Node("test_subscription", options)
{ {
sub_ = this->create_subscription<std_msgs::msg::String>( sub_ = this->create_subscription<std_msgs::msg::String>(
"the_topic", "the_topic",

View file

@ -23,7 +23,7 @@ class TimerNode : public rclcpp::Node
{ {
public: public:
explicit TimerNode(rclcpp::NodeOptions options) explicit TimerNode(rclcpp::NodeOptions options)
: Node("timer_node", options) : Node("test_timer", options)
{ {
is_done_ = false; is_done_ = false;
timer_ = this->create_wall_timer( timer_ = this->create_wall_timer(

View file

@ -14,40 +14,49 @@
import unittest import unittest
from tracetools_test.utils import ( from tracetools_test.case import TraceTestCase
cleanup_trace,
get_trace_event_names,
run_and_trace,
)
BASE_PATH = '/tmp'
PKG = 'tracetools_test'
node_creation_events = [
'ros2:rcl_init',
'ros2:rcl_node_init',
]
class TestNode(unittest.TestCase): VERSION_REGEX = r'^[0-9]\.[0-9]\.[0-9]$'
def test_creation(self):
session_name_prefix = 'session-test-node-creation'
test_node = ['test_publisher']
exit_code, full_path = run_and_trace( class TestNode(TraceTestCase):
BASE_PATH,
session_name_prefix,
node_creation_events,
None,
PKG,
test_node)
self.assertEqual(exit_code, 0)
trace_events = get_trace_event_names(full_path) def __init__(self, *args) -> None:
print(f'trace_events: {trace_events}') super().__init__(
self.assertSetEqual(set(node_creation_events), trace_events) *args,
session_name_prefix='session-test-node-creation',
events_ros=[
'ros2:rcl_init',
'ros2:rcl_node_init',
],
nodes=['test_publisher']
)
cleanup_trace(full_path) def test_all(self):
# Check events order as set (e.g. init before node_init)
self.assertEventsOrderSet(self._events_ros)
# Check fields
rcl_init_events = self.get_events_with_name('ros2:rcl_init')
for event in rcl_init_events:
self.assertValidHandle(event, 'context_handle')
# TODO actually compare to version fetched from the tracetools package?
version_field = self.get_field(event, 'version')
self.assertRegex(version_field, VERSION_REGEX, 'invalid version number')
rcl_node_init_events = self.get_events_with_name('ros2:rcl_node_init')
for event in rcl_node_init_events:
self.assertValidHandle(event, ['node_handle', 'rmw_handle'])
self.assertStringFieldNotEmpty(event, 'node_name')
self.assertStringFieldNotEmpty(event, 'namespace')
# Check that the launched nodes have a corresponding rcl_node_init event
node_name_fields = [self.get_field(e, 'node_name') for e in rcl_node_init_events]
for node_name in self._nodes:
self.assertTrue(
node_name in node_name_fields,
f'cannot find node_init event for node name: {node_name} ({node_name_fields})')
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -14,39 +14,52 @@
import unittest import unittest
from tracetools_test.utils import ( from tracetools_test.case import TraceTestCase
cleanup_trace,
get_trace_event_names,
run_and_trace,
)
BASE_PATH = '/tmp'
PKG = 'tracetools_test'
publisher_creation_events = [
'ros2:rcl_publisher_init',
]
class TestPublisher(unittest.TestCase): class TestPublisher(TraceTestCase):
def test_creation(self): def __init__(self, *args) -> None:
session_name_prefix = 'session-test-publisher-creation' super().__init__(
test_node = ['test_publisher'] *args,
session_name_prefix='session-test-publisher-creation',
events_ros=[
'ros2:rcl_node_init',
'ros2:rcl_publisher_init',
],
nodes=['test_publisher']
)
exit_code, full_path = run_and_trace( def test_all(self):
BASE_PATH, # Check events order as set (e.g. node_init before pub_init)
session_name_prefix, self.assertEventsOrderSet(self._events_ros)
publisher_creation_events,
# Check fields
pub_init_events = self.get_events_with_name('ros2:rcl_publisher_init')
for event in pub_init_events:
self.assertValidHandle(
event,
['publisher_handle', 'node_handle', 'rmw_publisher_handle'])
self.assertValidQueueDepth(event, 'queue_depth')
self.assertStringFieldNotEmpty(event, 'topic_name')
# Check that the test topic name exists
test_pub_init_events = self.get_events_with_procname('test_publisher', pub_init_events)
event_topic_names = [self.get_field(e, 'topic_name') for e in test_pub_init_events]
self.assertTrue('/the_topic' in event_topic_names, 'cannot find test topic name')
# Check that the node handle matches with the node_init event
node_init_events = self.get_events_with_name('ros2:rcl_node_init')
test_pub_node_init_events = self.get_events_with_procname(
'test_publisher',
node_init_events)
self.assertEqual(len(test_pub_node_init_events), 1, 'none or more than 1 node_init event')
test_pub_node_init_event = test_pub_node_init_events[0]
self.assertMatchingField(
test_pub_node_init_event,
'node_handle',
None, None,
PKG, test_pub_init_events)
test_node)
self.assertEqual(exit_code, 0)
trace_events = get_trace_event_names(full_path)
print(f'trace_events: {trace_events}')
self.assertSetEqual(set(publisher_creation_events), trace_events)
cleanup_trace(full_path)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -14,40 +14,68 @@
import unittest import unittest
from tracetools_test.utils import ( from tracetools_test.case import TraceTestCase
cleanup_trace,
get_trace_event_names,
run_and_trace,
)
BASE_PATH = '/tmp'
PKG = 'tracetools_test'
service_creation_events = [
'ros2:rcl_service_init',
'ros2:rclcpp_service_callback_added',
]
class TestService(unittest.TestCase): class TestService(TraceTestCase):
def test_creation(self): def __init__(self, *args) -> None:
session_name_prefix = 'session-test-service-creation' super().__init__(
test_nodes = ['test_service'] *args,
session_name_prefix='session-test-service-creation',
events_ros=[
'ros2:rcl_node_init',
'ros2:rcl_service_init',
'ros2:rclcpp_service_callback_added',
],
nodes=['test_service']
)
exit_code, full_path = run_and_trace( def test_all(self):
BASE_PATH, # Check events order as set (e.g. service_init before callback_added)
session_name_prefix, self.assertEventsOrderSet(self._events_ros)
service_creation_events,
# Check fields
srv_init_events = self.get_events_with_name('ros2:rcl_service_init')
for event in srv_init_events:
self.assertValidHandle(event, ['service_handle', 'node_handle', 'rmw_service_handle'])
self.assertStringFieldNotEmpty(event, 'service_name')
callback_added_events = self.get_events_with_name('ros2:rclcpp_service_callback_added')
for event in callback_added_events:
self.assertValidHandle(event, ['service_handle', 'callback'])
# Check that the test service name exists
test_srv_init_events = self.get_events_with_procname('test_service', srv_init_events)
event_service_names = self.get_events_with_field_value(
'service_name',
'/the_service',
test_srv_init_events)
self.assertGreaterEqual(
len(event_service_names),
1,
'cannot find test service name')
# Check that the node handle matches the node_init event
node_init_events = self.get_events_with_name('ros2:rcl_node_init')
test_srv_node_init_events = self.get_events_with_procname(
'test_service',
node_init_events)
self.assertEqual(len(test_srv_node_init_events), 1, 'none or more than 1 node_init event')
test_srv_node_init_event = test_srv_node_init_events[0]
self.assertMatchingField(
test_srv_node_init_event,
'node_handle',
'ros2:rcl_service_init',
test_srv_init_events)
# Check that the service handles match
test_event_srv_init = event_service_names[0]
self.assertMatchingField(
test_event_srv_init,
'service_handle',
None, None,
PKG, callback_added_events)
test_nodes)
self.assertEqual(exit_code, 0)
trace_events = get_trace_event_names(full_path)
print(f'trace_events: {trace_events}')
self.assertSetEqual(set(service_creation_events), trace_events)
cleanup_trace(full_path)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -14,40 +14,54 @@
import unittest import unittest
from tracetools_test.utils import ( from tracetools_test.case import TraceTestCase
cleanup_trace,
get_trace_event_names,
run_and_trace,
)
BASE_PATH = '/tmp'
PKG = 'tracetools_test'
service_callback_events = [
'ros2:callback_start',
'ros2:callback_end',
]
class TestServiceCallback(unittest.TestCase): class TestServiceCallback(TraceTestCase):
def test_callback(self): def __init__(self, *args) -> None:
session_name_prefix = 'session-test-service-callback' super().__init__(
test_nodes = ['test_service_ping', 'test_service_pong'] *args,
session_name_prefix='session-test-service-callback',
events_ros=[
'ros2:callback_start',
'ros2:callback_end',
],
nodes=['test_service_ping', 'test_service_pong']
)
exit_code, full_path = run_and_trace( def test_all(self):
BASE_PATH, # Check events order as set (e.g. start before end)
session_name_prefix, self.assertEventsOrderSet(self._events_ros)
service_callback_events,
None,
PKG,
test_nodes)
self.assertEqual(exit_code, 0)
trace_events = get_trace_event_names(full_path) # Check fields
print(f'trace_events: {trace_events}') start_events = self.get_events_with_name('ros2:callback_start')
self.assertSetEqual(set(service_callback_events), trace_events) for event in start_events:
self.assertValidHandle(event, 'callback')
is_intra_process_value = self.get_field(event, 'is_intra_process')
self.assertIsInstance(is_intra_process_value, int, 'is_intra_process not int')
# Should not be 1 for services (yet)
self.assertEqual(
is_intra_process_value,
0,
f'invalid value for is_intra_process: {is_intra_process_value}')
cleanup_trace(full_path) end_events = self.get_events_with_name('ros2:callback_end')
for event in end_events:
self.assertValidHandle(event, 'callback')
# Check that there is at least a start/end pair for each node
for node in self._nodes:
test_start_events = self.get_events_with_procname(node, start_events)
test_end_events = self.get_events_with_procname(node, end_events)
self.assertGreater(
len(test_start_events),
0,
f'no start_callback events for node: {node}')
self.assertGreater(
len(test_end_events),
0,
f'no end_callback events for node: {node}')
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -14,40 +14,71 @@
import unittest import unittest
from tracetools_test.utils import ( from tracetools_test.case import TraceTestCase
cleanup_trace,
get_trace_event_names,
run_and_trace,
)
BASE_PATH = '/tmp'
PKG = 'tracetools_test'
subscription_creation_events = [
'ros2:rcl_subscription_init',
'ros2:rclcpp_subscription_callback_added',
]
class TestSubscription(unittest.TestCase): class TestSubscription(TraceTestCase):
def test_creation(self): def __init__(self, *args) -> None:
session_name_prefix = 'session-test-subscription-creation' super().__init__(
test_node = ['test_subscription'] *args,
session_name_prefix='session-test-subscription-creation',
events_ros=[
'ros2:rcl_node_init',
'ros2:rcl_subscription_init',
'ros2:rclcpp_subscription_callback_added',
],
nodes=['test_subscription']
)
exit_code, full_path = run_and_trace( def test_all(self):
BASE_PATH, # Check events order as set (e.g. sub_init before callback_added)
session_name_prefix, self.assertEventsOrderSet(self._events_ros)
subscription_creation_events,
# Check fields
sub_init_events = self.get_events_with_name('ros2:rcl_subscription_init')
for event in sub_init_events:
self.assertValidHandle(
event,
['subscription_handle', 'node_handle', 'rmw_subscription_handle'])
self.assertValidQueueDepth(event, 'queue_depth')
self.assertStringFieldNotEmpty(event, 'topic_name')
callback_added_events = self.get_events_with_name(
'ros2:rclcpp_subscription_callback_added')
for event in callback_added_events:
self.assertValidHandle(event, ['subscription_handle', 'callback'])
# Check that the test topic name exists
test_sub_init_events = self.get_events_with_field_value(
'topic_name',
'/the_topic',
sub_init_events)
self.assertEqual(len(test_sub_init_events), 1, 'cannot find test topic name')
test_sub_init_event = test_sub_init_events[0]
# Check that the node handle matches the node_init event
node_init_events = self.get_events_with_name('ros2:rcl_node_init')
test_sub_node_init_events = self.get_events_with_procname(
'test_subscription',
node_init_events)
self.assertEqual(
len(test_sub_node_init_events),
1,
'none or more than 1 node_init event')
test_sub_node_init_event = test_sub_node_init_events[0]
self.assertMatchingField(
test_sub_node_init_event,
'node_handle',
'ros2:rcl_subscription_init',
sub_init_events)
# Check that subscription handle matches with callback_added event
self.assertMatchingField(
test_sub_init_event,
'subscription_handle',
None, None,
PKG, callback_added_events)
test_node)
self.assertEqual(exit_code, 0)
trace_events = get_trace_event_names(full_path)
print(f'trace_events: {trace_events}')
self.assertSetEqual(set(subscription_creation_events), trace_events)
cleanup_trace(full_path)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -14,40 +14,58 @@
import unittest import unittest
from tracetools_test.utils import ( from tracetools_test.case import TraceTestCase
cleanup_trace,
get_trace_event_names,
run_and_trace,
)
BASE_PATH = '/tmp'
PKG = 'tracetools_test'
subscription_callback_events = [
'ros2:callback_start',
'ros2:callback_end',
]
class TestSubscriptionCallback(unittest.TestCase): class TestSubscriptionCallback(TraceTestCase):
def test_callback(self): def __init__(self, *args) -> None:
session_name_prefix = 'session-test-subscription-callback' super().__init__(
test_nodes = ['test_ping', 'test_pong'] *args,
session_name_prefix='session-test-subscription-callback',
events_ros=[
'ros2:callback_start',
'ros2:callback_end',
],
nodes=['test_ping', 'test_pong']
)
exit_code, full_path = run_and_trace( def test_all(self):
BASE_PATH, # Check events order as set (e.g. start before end)
session_name_prefix, self.assertEventsOrderSet(self._events_ros)
subscription_callback_events,
None,
PKG,
test_nodes)
self.assertEqual(exit_code, 0)
trace_events = get_trace_event_names(full_path) # Check fields
print(f'trace_events: {trace_events}') start_events = self.get_events_with_name('ros2:callback_start')
self.assertSetEqual(set(subscription_callback_events), trace_events) for event in start_events:
self.assertValidHandle(event, 'callback')
is_intra_process_value = self.get_field(event, 'is_intra_process')
self.assertIsInstance(is_intra_process_value, int, 'is_intra_process not int')
self.assertTrue(
is_intra_process_value in [0, 1],
f'invalid value for is_intra_process: {is_intra_process_value}')
cleanup_trace(full_path) end_events = self.get_events_with_name('ros2:callback_end')
for event in end_events:
self.assertValidHandle(event, 'callback')
# Check that a start:end pair has a common callback handle
# Note: might be unstable if tracing is disabled too early
ping_events = self.get_events_with_procname('test_ping')
pong_events = self.get_events_with_procname('test_pong')
ping_events_start = self.get_events_with_name('ros2:callback_start', ping_events)
pong_events_start = self.get_events_with_name('ros2:callback_start', pong_events)
for ping_start in ping_events_start:
self.assertMatchingField(
ping_start,
'callback',
'ros2:callback_end',
ping_events)
for pong_start in pong_events_start:
self.assertMatchingField(
pong_start,
'callback',
'ros2:callback_end',
pong_events)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -14,42 +14,77 @@
import unittest import unittest
from tracetools_test.utils import ( from tracetools_test.case import TraceTestCase
cleanup_trace,
get_trace_event_names,
run_and_trace,
)
BASE_PATH = '/tmp'
PKG = 'tracetools_test'
timer_events = [
'ros2:rcl_timer_init',
'ros2:rclcpp_timer_callback_added',
'ros2:callback_start',
'ros2:callback_end',
]
class TestTimer(unittest.TestCase): class TestTimer(TraceTestCase):
def __init__(self, *args) -> None:
super().__init__(
*args,
session_name_prefix='session-test-timer-all',
events_ros=[
'ros2:rcl_timer_init',
'ros2:rclcpp_timer_callback_added',
'ros2:callback_start',
'ros2:callback_end',
],
nodes=['test_timer']
)
def test_all(self): def test_all(self):
session_name_prefix = 'session-test-timer-all' # Check events order as set (e.g. init, callback added, start, end)
test_nodes = ['test_timer'] self.assertEventsOrderSet(self._events_ros)
exit_code, full_path = run_and_trace( # Check fields
BASE_PATH, init_events = self.get_events_with_name('ros2:rcl_timer_init')
session_name_prefix, for event in init_events:
timer_events, self.assertValidHandle(event, 'timer_handle')
None, period_value = self.get_field(event, 'period')
PKG, self.assertIsInstance(period_value, int)
test_nodes) self.assertGreaterEqual(period_value, 0, f'invalid period value: {period_value}')
self.assertEqual(exit_code, 0)
trace_events = get_trace_event_names(full_path) callback_added_events = self.get_events_with_name('ros2:rclcpp_timer_callback_added')
print(f'trace_events: {trace_events}') for event in callback_added_events:
self.assertSetEqual(set(timer_events), trace_events) self.assertValidHandle(event, ['timer_handle', 'callback'])
cleanup_trace(full_path) start_events = self.get_events_with_name('ros2:callback_start')
for event in start_events:
self.assertValidHandle(event, 'callback')
is_intra_process_value = self.get_field(event, 'is_intra_process')
self.assertIsInstance(is_intra_process_value, int, 'is_intra_process not int')
# Should not be 1 for timer
self.assertEqual(
is_intra_process_value,
0,
f'invalid value for is_intra_process: {is_intra_process_value}')
end_events = self.get_events_with_name('ros2:callback_end')
for event in end_events:
self.assertValidHandle(event, 'callback')
# Find and check given timer period
test_timer_init_event = self.get_events_with_procname('test_timer', init_events)
self.assertEqual(len(test_timer_init_event), 1)
test_init_event = test_timer_init_event[0]
test_period = self.get_field(test_init_event, 'period')
self.assertIsInstance(test_period, int)
self.assertEqual(test_period, 1000000, f'invalid period: {test_period}')
# Check that the timer_init:callback_added pair exists and has a common timer handle
self.assertMatchingField(
test_init_event,
'timer_handle',
'ros2:rclcpp_timer_callback_added',
callback_added_events)
# Check that a callback start:end pair has a common callback handle
for start_event in start_events:
self.assertMatchingField(
start_event,
'callback',
None,
end_events)
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -0,0 +1,298 @@
# Copyright 2019 Robert Bosch GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for a tracing-specific unittest.TestCase extension."""
import time
from typing import Any
from typing import List
from typing import Union
import unittest
from .utils import cleanup_trace
from .utils import DictEvent
from .utils import get_event_name
from .utils import get_event_names
from .utils import get_event_timestamp
from .utils import get_field
from .utils import get_procname
from .utils import get_trace_events
from .utils import run_and_trace
class TraceTestCase(unittest.TestCase):
"""
TestCase extension for tests on a trace.
Sets up tracing, traces given nodes, and provides
the resulting events for an extending class to test on.
It also does some basic checks on the resulting trace.
"""
def __init__(
self,
*args,
session_name_prefix: str,
events_ros: List[str],
nodes: List[str],
base_path: str = '/tmp',
events_kernel: List[str] = None,
package: str = 'tracetools_test'
) -> None:
"""Constructor."""
print(f'methodName={args[0]}')
super().__init__(methodName=args[0])
self._base_path = base_path
self._session_name_prefix = session_name_prefix
self._events_ros = events_ros
self._events_kernel = events_kernel
self._package = package
self._nodes = nodes
def setUp(self):
# Get timestamp before trace (ns)
timestamp_before = int(time.time() * 1000000000.0)
exit_code, full_path = run_and_trace(
self._base_path,
self._session_name_prefix,
self._events_ros,
self._events_kernel,
self._package,
self._nodes)
print(f'TRACE DIRECTORY: {full_path}')
self._exit_code = exit_code
self._full_path = full_path
# Check that setUp() ran fine
self.assertEqual(self._exit_code, 0)
# Read events once
self._events = get_trace_events(self._full_path)
self._event_names = get_event_names(self._events)
self.assertGreater(len(self._events), 0, 'no events found in trace')
# Check the timestamp of the first event
self.assertEventAfterTimestamp(self._events[0], timestamp_before)
# Check that the enabled events are present
ros = set(self._events_ros) if self._events_ros is not None else set()
kernel = set(self._events_kernel) if self._events_kernel is not None else set()
all_event_names = ros | kernel
self.assertSetEqual(all_event_names, set(self._event_names))
# Check that the launched nodes are present as processes
self.assertProcessNamesExist(self._nodes)
def tearDown(self):
cleanup_trace(self._full_path)
def assertEventsOrderSet(self, event_names: List[str]):
"""
Compare given event names to trace events names as sets.
:param event_names: the list of event names to compare to (as a set)
"""
self.assertSetEqual(set(self._event_names), set(event_names), 'wrong events order')
def assertProcessNamesExist(self, names: List[str]):
"""
Check that the given processes exist.
:param names: the node names to look for
"""
procnames = [get_procname(e) for e in self._events]
for name in names:
# Procnames have a max length of 15
name_trimmed = name[:15]
self.assertTrue(name_trimmed in procnames, 'node name not found in tracepoints')
def assertValidHandle(self, event: DictEvent, handle_field_name: Union[str, List[str]]):
"""
Check that the handle associated with a field name is valid.
:param event: the event which has a handle field
:param handle_field_name: the field name(s) of the handle to check
"""
is_list = isinstance(handle_field_name, list)
handle_field_names = handle_field_name if is_list else [handle_field_name]
for field_name in handle_field_names:
handle_value = self.get_field(event, field_name)
self.assertIsInstance(handle_value, int, 'handle value not int')
self.assertGreater(handle_value, 0, f'invalid handle value: {field_name}')
def assertValidQueueDepth(self, event: DictEvent, queue_depth_field_name: str = 'queue_depth'):
queue_depth_value = self.get_field(event, 'queue_depth')
self.assertIsInstance(queue_depth_value, int, 'invalid queue depth type')
self.assertGreater(queue_depth_value, 0, 'invalid queue depth')
def assertStringFieldNotEmpty(self, event: DictEvent, string_field_name: str):
"""
Check that a string field is not empty.
:param event: the event which has a string field
:param string_field_name: the field name of the string field
"""
string_field = self.get_field(event, string_field_name)
self.assertGreater(len(string_field), 0, 'empty string')
def assertEventAfterTimestamp(self, event: DictEvent, timestamp: int):
self.assertGreater(get_event_timestamp(event), timestamp, 'event not after timestamp')
def assertEventOrder(self, first_event: DictEvent, second_event: DictEvent):
"""
Check that the first event was generated before the second event.
:param first_event: the first event
:param second_event: the second event
"""
self.assertTrue(self.are_events_ordered(first_event, second_event))
def assertMatchingField(
self,
initial_event: DictEvent,
field_name: str,
matching_event_name: str = None,
events: List[DictEvent] = None
):
"""
Check that the value of a field for a given event has a matching event that follows.
:param initial_event: the first event, which is the origin of the common field value
:param field_name: the name of the common field to check
:param matching_event_name: the name of the event to check (or None to check all)
:param events: the events to check (or None to check all in trace)
"""
if events is None:
events = self._events
if matching_event_name is not None:
events = self.get_events_with_name(matching_event_name, events)
field_value = self.get_field(initial_event, field_name)
# Get events with that handle
matches = self.get_events_with_field_value(
field_name,
field_value,
events)
# Check that there is at least one
self.assertGreaterEqual(
len(matches),
1,
f'no corresponding {field_name}')
# Check order
# Since matching pairs might repeat, we need to check
# that there is at least one match that comes after
matches_ordered = [e for e in matches if self.are_events_ordered(initial_event, e)]
self.assertGreaterEqual(
len(matches_ordered),
1,
'matching field event not after initial event')
def assertFieldEquals(self, event: DictEvent, field_name: str, value: Any):
"""
Check the value of a field.
:param event: the event
:param field_name: the name of the field to check
:param value: to value to compare the field value to
"""
actual_value = self.get_field(event, field_name)
self.assertEqual(actual_value, value, 'invalid field value')
def get_field(self, event: DictEvent, field_name: str) -> Any:
"""
Get field value; will fail test if not found.
:param event: the event from which to get the value
:param field_name: the field name
:return: the value
"""
try:
value = get_field(event, field_name, default=None, raise_if_not_found=True)
except AttributeError as e:
# Explicitly failing here
self.fail(str(e))
else:
return value
def get_procname(self, event: DictEvent) -> str:
"""
Get procname.
:param event: the event
:return: the procname of the event
"""
return get_procname(event)
def get_events_with_name(
self,
event_name: str,
events: List[DictEvent] = None
) -> List[DictEvent]:
"""
Get all events with the given name.
:param event_name: the event name
:param events: the events to check (or None to check all events)
:return: the list of events with the given name
"""
if events is None:
events = self._events
return [e for e in events if get_event_name(e) == event_name]
def get_events_with_procname(
self,
procname: str,
events: List[DictEvent] = None
) -> List[DictEvent]:
"""
Get all events with the given procname.
:param procname: the procname
:param events: the events to check (or None to check all events)
:return: the events with the given procname
"""
if events is None:
events = self._events
return [e for e in events if self.get_procname(e) == procname[:15]]
def get_events_with_field_value(
self,
field_name: str,
field_value: Any,
events: List[DictEvent] = None
) -> List[DictEvent]:
"""
Get all events with the given field:value.
:param field_name: the name of the field to check
:param field_value: the value of the field to check
:param events: the events to check (or None to check all events)
:return: the events with the given field:value pair
"""
if events is None:
events = self._events
return [e for e in events if get_field(e, field_name, None) == field_value]
def are_events_ordered(self, first_event: DictEvent, second_event: DictEvent):
"""
Check that the first event was generated before the second event.
:param first_event: the first event
:param second_event: the second event
"""
first_timestamp = get_event_timestamp(first_event)
second_timestamp = get_event_timestamp(second_event)
return first_timestamp < second_timestamp

View file

@ -12,13 +12,14 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""Utils for tracetools_test.""" """Utils for tracetools_test that are not strictly test-related."""
import os import os
import shutil import shutil
import time import time
from typing import Any
from typing import Dict
from typing import List from typing import List
from typing import Set
from typing import Tuple from typing import Tuple
import babeltrace import babeltrace
@ -41,7 +42,7 @@ def run_and_trace(
kernel_events: List[str], kernel_events: List[str],
package_name: str, package_name: str,
node_names: List[str] node_names: List[str]
) -> Tuple[int, str]: ) -> Tuple[int, str]:
""" """
Run a node while tracing. Run a node while tracing.
@ -55,7 +56,6 @@ def run_and_trace(
""" """
session_name = f'{session_name_prefix}-{time.strftime("%Y%m%d%H%M%S")}' session_name = f'{session_name_prefix}-{time.strftime("%Y%m%d%H%M%S")}'
full_path = os.path.join(base_path, session_name) full_path = os.path.join(base_path, session_name)
print(f'trace directory: {full_path}')
lttng_setup(session_name, full_path, ros_events=ros_events, kernel_events=kernel_events) lttng_setup(session_name, full_path, ros_events=ros_events, kernel_events=kernel_events)
lttng_start(session_name) lttng_start(session_name)
@ -89,19 +89,70 @@ def cleanup_trace(full_path: str) -> None:
shutil.rmtree(full_path) shutil.rmtree(full_path)
def get_trace_event_names(trace_directory: str) -> Set[str]: DictEvent = Dict[str, Any]
def get_trace_events(trace_directory: str) -> List[DictEvent]:
""" """
Get a set of event names in a trace. Get the events of a trace.
:param trace_directory: the path to the main/top trace directory :param trace_directory: the path to the main/top trace directory
:return: event names :return: events
""" """
tc = babeltrace.TraceCollection() tc = babeltrace.TraceCollection()
tc.add_traces_recursive(trace_directory, 'ctf') tc.add_traces_recursive(trace_directory, 'ctf')
event_names = set() return [_event_to_dict(event) for event in tc.events]
for event in tc.events:
event_names.add(event.name)
return event_names # List of ignored CTF fields
_IGNORED_FIELDS = [
'content_size', 'cpu_id', 'events_discarded', 'id', 'packet_size', 'packet_seq_num',
'stream_id', 'stream_instance_id', 'timestamp_end', 'timestamp_begin', 'magic', 'uuid', 'v'
]
_DISCARD = 'events_discarded'
def _event_to_dict(event: babeltrace.babeltrace.Event) -> DictEvent:
"""
Convert name, timestamp, and all other keys except those in IGNORED_FIELDS into a dictionary.
:param event: the event to convert
:return: the event as a dictionary
"""
d = {'_name': event.name, '_timestamp': event.timestamp}
if hasattr(event, _DISCARD) and event[_DISCARD] > 0:
print(event[_DISCARD])
for key in [key for key in event.keys() if key not in _IGNORED_FIELDS]:
d[key] = event[key]
return d
def get_event_names(events: List[DictEvent]) -> List[str]:
"""
Get a list of names of the events in the trace.
:param events: the events of the trace
:return: the list of event names
"""
return [get_event_name(e) for e in events]
def get_field(event: DictEvent, field_name: str, default=None, raise_if_not_found=True) -> Any:
field_value = event.get(field_name, default)
# If enabled, raise exception as soon as possible to avoid headaches
if raise_if_not_found and field_value is None:
raise AttributeError(f'event field "{field_name}" not found!')
return field_value
def get_event_name(event: DictEvent) -> str:
return event['_name']
def get_event_timestamp(event: DictEvent) -> int:
return event['_timestamp']
def get_procname(event: DictEvent) -> str:
return event['procname']