Merge branch 'add-kernel-analyses-with-profiling' into 'master'

Rework analysis architecture for kernel analyses and add profiling analysis

See merge request micro-ROS/ros_tracing/tracetools_analysis!11
This commit is contained in:
Christophe Bedard 2019-08-14 12:25:47 +00:00
commit 505701ff99
23 changed files with 2113 additions and 212 deletions

View file

@ -6,10 +6,13 @@ variables:
before_script:
- git clone https://gitlab.com/micro-ROS/ros_tracing/ros2_tracing.git
- rosdep install --from-paths . -i . -i /root/ws --rosdistro dashing -y
- . /root/ws/install/local_setup.sh
build:
script:
- colcon build --symlink-install --packages-up-to $PACKAGES_LIST
- . install/local_setup.sh
- colcon test --packages-select $PACKAGES_LIST
- colcon test-result
artifacts:

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,40 @@
# Copyright 2019 Robert Bosch GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example launch file for a profiling analysis."""
from launch import LaunchDescription
from launch_ros.actions import Node
from tracetools_launch.action import Trace
from tracetools_trace.tools.names import DEFAULT_EVENTS_ROS
def generate_launch_description():
return LaunchDescription([
Trace(
session_name='profile',
events_ust=[
'lttng_ust_cyg_profile_fast:func_entry',
'lttng_ust_cyg_profile_fast:func_exit',
] + DEFAULT_EVENTS_ROS,
events_kernel=[
'sched_switch',
],
),
Node(
package='ros_performance',
node_executable='nopub',
output='screen',
),
])

View file

@ -11,6 +11,7 @@
<author email="fixed-term.christophe.bourquebedard@de.bosch.com">Christophe Bedard</author>
<exec_depend>tracetools_read</exec_depend>
<exec_depend>python3-pandas</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>

View file

@ -1,3 +1,5 @@
import glob
from setuptools import find_packages
from setuptools import setup
@ -9,6 +11,7 @@ setup(
packages=find_packages(exclude=['test']),
data_files=[
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/launch', glob.glob('launch/*.launch.py')),
],
install_requires=['setuptools'],
maintainer=(

View file

@ -0,0 +1,118 @@
# Copyright 2019 Robert Bosch GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from tracetools_analysis.processor import Dependant
from tracetools_analysis.processor import DependencySolver
class DepEmtpy(Dependant):
def __init__(self, **kwargs) -> None:
self.myparam = kwargs.get('myparam', None)
class DepOne(Dependant):
@staticmethod
def dependencies():
return [DepEmtpy]
class DepOne2(Dependant):
@staticmethod
def dependencies():
return [DepEmtpy]
class DepTwo(Dependant):
@staticmethod
def dependencies():
return [DepOne, DepOne2]
class TestDependencySolver(unittest.TestCase):
def __init__(self, *args) -> None:
super().__init__(
*args,
)
def test_single_dep(self) -> None:
depone_instance = DepOne()
# DepEmtpy should be added before
solution = DependencySolver(depone_instance).solve()
self.assertEqual(len(solution), 2, 'solution length invalid')
self.assertIsInstance(solution[0], DepEmtpy)
self.assertIs(solution[1], depone_instance)
def test_single_dep_existing(self) -> None:
depempty_instance = DepEmtpy()
depone_instance = DepOne()
# Already in order
solution = DependencySolver(depempty_instance, depone_instance).solve()
self.assertEqual(len(solution), 2, 'solution length invalid')
self.assertIs(solution[0], depempty_instance, 'wrong solution order')
self.assertIs(solution[1], depone_instance, 'wrong solution order')
# Out of order
solution = DependencySolver(depone_instance, depempty_instance).solve()
self.assertEqual(len(solution), 2, 'solution length invalid')
self.assertIs(solution[0], depempty_instance, 'solution does not use existing instance')
self.assertIs(solution[1], depone_instance, 'solution does not use existing instance')
def test_duplicate_dependency(self) -> None:
deptwo_instance = DepTwo()
# DepOne and DepOne2 both depend on DepEmpty
solution = DependencySolver(deptwo_instance).solve()
self.assertEqual(len(solution), 4, 'solution length invalid')
self.assertIsInstance(solution[0], DepEmtpy)
self.assertIsInstance(solution[1], DepOne)
self.assertIsInstance(solution[2], DepOne2)
self.assertIs(solution[3], deptwo_instance)
# Existing instance of DepEmpty, in order
depempty_instance = DepEmtpy()
solution = DependencySolver(depempty_instance, deptwo_instance).solve()
self.assertEqual(len(solution), 4, 'solution length invalid')
self.assertIsInstance(solution[0], DepEmtpy)
self.assertIsInstance(solution[1], DepOne)
self.assertIsInstance(solution[2], DepOne2)
self.assertIs(solution[3], deptwo_instance)
# Existing instance of DepEmpty, not in order
solution = DependencySolver(deptwo_instance, depempty_instance).solve()
self.assertEqual(len(solution), 4, 'solution length invalid')
self.assertIsInstance(solution[0], DepEmtpy)
self.assertIsInstance(solution[1], DepOne)
self.assertIsInstance(solution[2], DepOne2)
self.assertIs(solution[3], deptwo_instance)
def test_kwargs(self) -> None:
depone_instance = DepOne()
# Pass parameter and check that the new instance has it
solution = DependencySolver(depone_instance, myparam='myvalue').solve()
self.assertEqual(len(solution), 2, 'solution length invalid')
self.assertEqual(solution[0].myparam, 'myvalue', 'parameter not passed on')
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,100 @@
# Copyright 2019 Robert Bosch GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict
import unittest
from tracetools_analysis.processor import EventHandler
from tracetools_analysis.processor import EventMetadata
from tracetools_analysis.processor import Processor
class StubHandler1(EventHandler):
def __init__(self) -> None:
handler_map = {
'myeventname': self._handler_whatever,
}
super().__init__(handler_map=handler_map)
self.handler_called = False
def _handler_whatever(
self, event: Dict, metadata: EventMetadata
) -> None:
self.handler_called = True
class StubHandler2(EventHandler):
def __init__(self) -> None:
handler_map = {
'myeventname': self._handler_whatever,
}
super().__init__(handler_map=handler_map)
self.handler_called = False
def _handler_whatever(
self, event: Dict, metadata: EventMetadata
) -> None:
self.handler_called = True
class WrongHandler(EventHandler):
def __init__(self) -> None:
handler_map = {
'myeventname': self._handler_wrong,
}
super().__init__(handler_map=handler_map)
def _handler_wrong(
self,
) -> None:
pass
class TestProcessor(unittest.TestCase):
def __init__(self, *args) -> None:
super().__init__(
*args,
)
def test_handler_wrong_signature(self) -> None:
handler = WrongHandler()
mock_event = {
'_name': 'myeventname',
'_timestamp': 0,
'cpu_id': 0,
}
processor = Processor(handler)
with self.assertRaises(TypeError):
processor.process([mock_event])
def test_handler_method_with_merge(self) -> None:
handler1 = StubHandler1()
handler2 = StubHandler2()
mock_event = {
'_name': 'myeventname',
'_timestamp': 0,
'cpu_id': 0,
}
processor = Processor(handler1, handler2)
processor.process([mock_event])
self.assertTrue(handler1.handler_called, 'event handler not called')
self.assertTrue(handler2.handler_called, 'event handler not called')
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,320 @@
# Copyright 2019 Robert Bosch GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any
from typing import Dict
from typing import List
import unittest
from pandas import DataFrame
from pandas.util.testing import assert_frame_equal
from tracetools_analysis.processor import Processor
from tracetools_analysis.processor.profile import ProfileHandler
from tracetools_read import DictEvent
# TEST DATA
#
# + Threads:
# 0: does whatever
# 1: contains one instance of the functions of interest
# 2: contains another instance of the functions of interest
#
# + Functions structure
# function_a
# function_aa
# function_b
#
# + Timeline
# tid 1 2
# func a aa b a aa b
# time
# 0 : whatever
# 3 : sched_switch from tid 0 to tid 1
# 5 : tid 1, func_entry: function_a
# 7 : sched_switch from tid 1 to tid 0 2
# 10 : sched_switch from tid 0 to tid 2
# 11 : tid 2, func_entry: function_a
# 15 : sched_switch from tid 2 to tid 1 4
# 16 : tid 1, func_entry: function_aa 1
# 20 : sched_switch from tid 1 to tid 2 4 4
# 27 : tid 2, func_entry: function_aa 7
# 29 : sched_switch from tid 2 to tid 1 2 2
# 30 : tid 1, func_exit: (function_aa) 1 1
# 32 : sched_switch from tid 1 to tid 0 2
# 34 : sched_switch from tid 0 to tid 2
# 35 : tid 2, func_exit: (function_aa) 1 1
# 37 : tid 2, func_exit: (function_a) 2
# 39 : tid 2, func_entry: function_b
# 40 : tid 2, func_exit: (function_b) 1
# 41 : sched_switch from tid 2 to tid 1
# 42 : tid 1, func_exit: (function_a) 1
# 44 : tid 1, func_entry: function_b
# 47 : sched_switch from tid 1 to tid 0 3
# 49 : sched_switch from tid 0 to tid 1
# 60 : tid 1, func_exit: (function_b) 11
# 69 : sched_switch from tid 1 to tid 0
#
# total 11 5 14 16 3 1
input_events = [
{
'_name': 'sched_switch',
'_timestamp': 3,
'prev_tid': 0,
'next_tid': 1,
},
{
'_name': 'lttng_ust_cyg_profile_fast:func_entry',
'_timestamp': 5,
'vtid': 1,
'addr': '0xfA',
},
{
'_name': 'sched_switch',
'_timestamp': 7,
'prev_tid': 1,
'next_tid': 0,
},
{
'_name': 'sched_switch',
'_timestamp': 10,
'prev_tid': 0,
'next_tid': 2,
},
{
'_name': 'lttng_ust_cyg_profile_fast:func_entry',
'_timestamp': 11,
'vtid': 2,
'addr': '0xfA',
},
{
'_name': 'sched_switch',
'_timestamp': 15,
'prev_tid': 2,
'next_tid': 1,
},
{
'_name': 'lttng_ust_cyg_profile_fast:func_entry',
'_timestamp': 16,
'vtid': 1,
'addr': '0xfAA',
},
{
'_name': 'sched_switch',
'_timestamp': 20,
'prev_tid': 1,
'next_tid': 2,
},
{
'_name': 'lttng_ust_cyg_profile_fast:func_entry',
'_timestamp': 27,
'vtid': 2,
'addr': '0xfAA',
},
{
'_name': 'sched_switch',
'_timestamp': 29,
'prev_tid': 2,
'next_tid': 1,
},
{
'_name': 'lttng_ust_cyg_profile_fast:func_exit',
'_timestamp': 30,
'vtid': 1,
},
{
'_name': 'sched_switch',
'_timestamp': 32,
'prev_tid': 1,
'next_tid': 0,
},
{
'_name': 'sched_switch',
'_timestamp': 34,
'prev_tid': 0,
'next_tid': 2,
},
{
'_name': 'lttng_ust_cyg_profile_fast:func_exit',
'_timestamp': 35,
'vtid': 2,
},
{
'_name': 'lttng_ust_cyg_profile_fast:func_exit',
'_timestamp': 37,
'vtid': 2,
},
{
'_name': 'lttng_ust_cyg_profile_fast:func_entry',
'_timestamp': 39,
'vtid': 2,
'addr': '0xfB',
},
{
'_name': 'lttng_ust_cyg_profile_fast:func_exit',
'_timestamp': 40,
'vtid': 2,
},
{
'_name': 'sched_switch',
'_timestamp': 41,
'prev_tid': 2,
'next_tid': 1,
},
{
'_name': 'lttng_ust_cyg_profile_fast:func_exit',
'_timestamp': 42,
'vtid': 1,
},
{
'_name': 'lttng_ust_cyg_profile_fast:func_entry',
'_timestamp': 44,
'vtid': 1,
'addr': '0xfB',
},
{
'_name': 'sched_switch',
'_timestamp': 47,
'prev_tid': 1,
'next_tid': 0,
},
{
'_name': 'sched_switch',
'_timestamp': 49,
'prev_tid': 0,
'next_tid': 1,
},
{
'_name': 'lttng_ust_cyg_profile_fast:func_exit',
'_timestamp': 60,
'vtid': 1,
},
{
'_name': 'sched_switch',
'_timestamp': 69,
'prev_tid': 1,
'next_tid': 0,
},
]
expected = [
{
'tid': 1,
'depth': 1,
'function_name': '0xfAA',
'parent_name': '0xfA',
'start_timestamp': 16,
'duration': 14,
'actual_duration': 5,
},
{
'tid': 2,
'depth': 1,
'function_name': '0xfAA',
'parent_name': '0xfA',
'start_timestamp': 27,
'duration': 8,
'actual_duration': 3,
},
{
'tid': 2,
'depth': 0,
'function_name': '0xfA',
'parent_name': None,
'start_timestamp': 11,
'duration': 26,
'actual_duration': 16,
},
{
'tid': 2,
'depth': 0,
'function_name': '0xfB',
'parent_name': None,
'start_timestamp': 39,
'duration': 1,
'actual_duration': 1,
},
{
'tid': 1,
'depth': 0,
'function_name': '0xfA',
'parent_name': None,
'start_timestamp': 5,
'duration': 37,
'actual_duration': 11,
},
{
'tid': 1,
'depth': 0,
'function_name': '0xfB',
'parent_name': None,
'start_timestamp': 44,
'duration': 16,
'actual_duration': 14,
},
]
class TestProfileHandler(unittest.TestCase):
def __init__(self, *args) -> None:
super().__init__(
*args,
)
@staticmethod
def build_expected_df(expected_data: List[Dict[str, Any]]) -> DataFrame:
# Make sure the columns are in the same order
expected_df = DataFrame(columns=[
'tid',
'depth',
'function_name',
'parent_name',
'start_timestamp',
'duration',
'actual_duration',
])
return expected_df.append(expected_data, ignore_index=True)
@staticmethod
def add_fake_fields(events: List[DictEvent]) -> None:
# Actual value does not matter here; it just needs to be there
for event in events:
event['cpu_id'] = 69
@classmethod
def setUpClass(cls):
cls.add_fake_fields(input_events)
cls.expected = cls.build_expected_df(expected)
cls.handler = ProfileHandler()
cls.processor = Processor(cls.handler)
cls.processor.process(input_events)
def test_profiling(self) -> None:
handler = self.__class__.handler
expected_df = self.__class__.expected
result_df = handler.get_data_model().times
print('RESULT')
print(result_df.to_string())
print('EXPECTED')
print(expected_df.to_string())
assert_frame_equal(result_df, expected_df)
if __name__ == '__main__':
unittest.main()

View file

@ -1,71 +0,0 @@
# Copyright 2019 Robert Bosch GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for event handler."""
import sys
from typing import Callable
from typing import Dict
from typing import List
from tracetools_read.utils import get_event_name
from tracetools_read.utils import get_field
from .lttng_models import EventMetadata
class EventHandler():
"""Base event handling class."""
def __init__(self, handler_map: Dict[str, Callable[[Dict, EventMetadata], None]]) -> None:
"""
Constructor.
:param handler_map: the mapping from event name to handling method
"""
self._handler_map = handler_map
def handle_events(self, events: List[Dict[str, str]]) -> None:
"""
Handle events by calling their handlers.
:param events: the events to process
"""
for event in events:
self._handle(event)
def _handle(self, event: Dict[str, str]) -> None:
event_name = get_event_name(event)
handler_function = self._handler_map.get(event_name, None)
if handler_function is not None:
pid = get_field(
event,
'vpid',
default=get_field(
event,
'pid',
raise_if_not_found=False))
tid = get_field(
event,
'vtid',
default=get_field(
event,
'tid',
raise_if_not_found=False))
timestamp = get_field(event, '_timestamp')
procname = get_field(event, 'procname')
metadata = EventMetadata(event_name, pid, tid, timestamp, procname)
handler_function(event, metadata)
else:
print(f'unhandled event name: {event_name}', file=sys.stderr)

View file

@ -1,46 +0,0 @@
# Copyright 2019 Robert Bosch GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module LTTng traces/events models."""
class EventMetadata():
"""Container for event metadata."""
def __init__(self, event_name, pid, tid, timestamp, procname) -> None:
self._event_name = event_name
self._pid = pid
self._tid = tid
self._timestamp = timestamp
self._procname = procname
@property
def event_name(self):
return self._event_name
@property
def pid(self):
return self._pid
@property
def tid(self):
return self._tid
@property
def timestamp(self):
return self._timestamp
@property
def procname(self):
return self._procname

View file

@ -16,7 +16,8 @@
from pickle import Pickler
from tracetools_read import utils
from tracetools_read import event_to_dict
from tracetools_read import get_trace_ctf_events
def ctf_to_pickle(trace_directory: str, target: Pickler) -> int:
@ -27,7 +28,7 @@ def ctf_to_pickle(trace_directory: str, target: Pickler) -> int:
:param target: the target pickle file to write to
:return: the number of events written
"""
ctf_events = utils._get_trace_ctf_events(trace_directory)
ctf_events = get_trace_ctf_events(trace_directory)
count = 0
count_written = 0
@ -35,7 +36,7 @@ def ctf_to_pickle(trace_directory: str, target: Pickler) -> int:
for event in ctf_events:
count += 1
pod = utils.event_to_dict(event)
pod = event_to_dict(event)
target.dump(pod)
count_written += 1

View file

@ -11,3 +11,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base data model module."""
class DataModel():
"""
Container with pre-processed data for an analysis to use.
Contains data for an analysis to use. This is a middleground between trace events data and the
output data of an analysis. It uses pandas `DataFrame` directly.
"""
def __init__(self) -> None:
pass

View file

@ -0,0 +1,59 @@
# Copyright 2019 Robert Bosch GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for CPU time data model."""
import pandas as pd
from . import DataModel
class CpuTimeDataModel(DataModel):
"""
Container to model pre-processed CPU time data for analysis.
Contains every duration instance.
"""
def __init__(self) -> None:
"""Constructor."""
super().__init__()
self.times = pd.DataFrame(columns=[
'tid',
'start_timestamp',
'duration',
'cpu_id',
])
def add_duration(
self,
tid: int,
start_timestamp: int,
duration: int,
cpu_id: int,
) -> None:
data = {
'tid': tid,
'start_timestamp': start_timestamp,
'duration': duration,
'cpu_id': cpu_id,
}
self.times = self.times.append(data, ignore_index=True)
def print_model(self) -> None:
"""Debug method to print every contained df."""
print('====================CPU TIME DATA MODEL====================')
tail = 20
print(f'Times (tail={tail}):\n{self.times.tail(tail).to_string()}')
print('===========================================================')

View file

@ -0,0 +1,69 @@
# Copyright 2019 Robert Bosch GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for profile data model."""
import pandas as pd
from . import DataModel
class ProfileDataModel(DataModel):
"""
Container to model pre-processed profiling data for analysis.
Duration is the time difference between the function entry and the function exit.
Actual duration is the actual time spent executing the function (or a child function).
"""
def __init__(self) -> None:
"""Constructor."""
super().__init__()
self.times = pd.DataFrame(columns=[
'tid',
'depth',
'function_name',
'parent_name',
'start_timestamp',
'duration',
'actual_duration',
])
def add_duration(
self,
tid: int,
depth: int,
function_name: str,
parent_name: str,
start_timestamp: int,
duration: int,
actual_duration: int,
) -> None:
data = {
'tid': tid,
'depth': depth,
'function_name': function_name,
'parent_name': parent_name,
'start_timestamp': start_timestamp,
'duration': duration,
'actual_duration': actual_duration,
}
self.times = self.times.append(data, ignore_index=True)
def print_model(self) -> None:
"""Debug method to print every contained df."""
print('====================PROFILE DATA MODEL====================')
tail = 20
print(f'Times (tail={tail}):\n{self.times.tail(tail).to_string()}')
print('==========================================================')

View file

@ -12,21 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for data model."""
"""Module for ROS data model."""
import pandas as pd
from . import DataModel
class DataModel():
class RosDataModel(DataModel):
"""
Container to model pre-processed data for analysis.
Container to model pre-processed ROS data for analysis.
Contains data for an analysis to use. This is a middleground between trace events data and the
output data of an analysis. This aims to represent the data in a ROS-aware way.
It uses pandas DataFrames directly.
This aims to represent the data in a ROS-aware way.
"""
def __init__(self) -> None:
"""Constructor."""
super().__init__()
# Objects (one-time events, usually when something is created)
self.contexts = pd.DataFrame(columns=['context_handle',
'timestamp',
@ -145,7 +147,7 @@ class DataModel():
def print_model(self) -> None:
"""Debug method to print every contained df."""
print('====================DATA MODEL====================')
print('====================ROS DATA MODEL====================')
print(f'Contexts:\n{self.contexts.to_string()}')
print()
print(f'Nodes:\n{self.nodes.to_string()}')

View file

@ -18,8 +18,8 @@
import argparse
import time
from tracetools_analysis.analysis import load
from tracetools_analysis.analysis import ros2_processor
from tracetools_analysis.loading import load_pickle
from tracetools_analysis.processor.ros2 import Ros2Handler
def parse_args():
@ -31,13 +31,14 @@ def parse_args():
def main():
args = parse_args()
pickle_filename = args.pickle_file
start_time = time.time()
events = load.load_pickle(pickle_filename)
processor = ros2_processor.ros2_process(events)
events = load_pickle(pickle_filename)
ros2_handler = Ros2Handler.process(events)
time_diff = time.time() - start_time
print(f'processed {len(events)} events in {time_diff * 1000:.2f} ms')
processor.get_data_model().print_model()
ros2_handler.get_data_model().print_model()

View file

@ -0,0 +1,331 @@
# Copyright 2019 Robert Bosch GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base processor module."""
from collections import defaultdict
from typing import Callable
from typing import Dict
from typing import List
from typing import Set
from typing import Type
from tracetools_read import DictEvent
from tracetools_read import get_event_name
from tracetools_read import get_field
class EventMetadata():
"""Container for event metadata."""
def __init__(
self,
event_name: str,
timestamp: int,
cpu_id: int,
procname: str = None,
pid: int = None,
tid: int = None,
) -> None:
"""
Constructor.
Parameters with a default value of `None` are not mandatory,
since they are not always present.
"""
self._event_name = event_name
self._timestamp = timestamp
self._cpu_id = cpu_id
self._procname = procname
self._pid = pid
self._tid = tid
@property
def event_name(self):
return self._event_name
@property
def timestamp(self):
return self._timestamp
@property
def cpu_id(self):
return self._cpu_id
@property
def procname(self):
return self._procname
@property
def pid(self):
return self._pid
@property
def tid(self):
return self._tid
HandlerMap = Dict[str, Callable[[DictEvent, EventMetadata], None]]
HandlerMultimap = Dict[str, List[Callable[[DictEvent, EventMetadata], None]]]
class Dependant():
"""
Object which depends on other types.
A dependant depends on other types which might have dependencies themselves.
Dependencies are type-related only.
"""
@staticmethod
def dependencies() -> List[Type['Dependant']]:
"""
Get the dependencies that should also exist along with this current one.
Subclasses should override this method if they want to declare dependencies.
Default: no dependencies.
"""
return []
class EventHandler(Dependant):
"""
Base event handling class.
Provides handling functions for some events, depending on the name.
To be subclassed.
"""
def __init__(
self,
*,
handler_map: HandlerMap,
**kwargs,
) -> None:
"""
Constructor.
TODO make subclasses pass on their *DataModel to this class
:param handler_map: the mapping from event name to handling method
"""
assert handler_map is not None and len(handler_map) > 0, \
f'empty map: {self.__class__.__name__}'
self._handler_map = handler_map
self.processor = None
@property
def handler_map(self) -> HandlerMap:
"""Get the handler functions map."""
return self._handler_map
def register_processor(self, processor: 'Processor') -> None:
"""Register processor with this `EventHandler` so that it can query other handlers."""
self.processor = processor
@classmethod
def process(cls, events: List[DictEvent], **kwargs) -> 'EventHandler':
"""
Create a `Processor` and process an instance of the class.
:param events: the list of events
:return: the processor object after processing
"""
assert cls != EventHandler, 'only call process() from inheriting classes'
handler_object = cls(**kwargs) # pylint: disable=all
processor = Processor(handler_object, **kwargs)
processor.process(events)
return handler_object
class DependencySolver():
"""
Solve `Dependant` dependencies.
Post-order depth-first search (ish). Does not check for circular dependencies or other errors.
"""
def __init__(
self,
*initial_dependants: Dependant,
**kwargs,
) -> None:
"""
Constructor.
:param initial_dependants: the initial dependant instances, in order
:param kwargs: the parameters to pass on to new instances
"""
self._initial_deps = list(initial_dependants)
self._kwargs = kwargs
def solve(self) -> List[Dependant]:
"""
Solve.
:return: the solved list, including at least the initial dependants, in order
"""
visited: Set[Type[Dependant]] = set()
solution: List[Dependant] = []
initial_map = {type(dep_instance): dep_instance for dep_instance in self._initial_deps}
for dep_instance in self._initial_deps:
self.__solve_instance(
dep_instance,
visited,
initial_map,
solution,
)
return solution
def __solve_instance(
self,
dep_instance: Dependant,
visited: Set[Type[Dependant]],
initial_map: Dict[Type[Dependant], Dependant],
solution: List[Dependant],
) -> None:
if type(dep_instance) not in visited:
for dependency_type in type(dep_instance).dependencies():
self.__solve_type(
dependency_type,
visited,
initial_map,
solution,
)
solution.append(dep_instance)
visited.add(type(dep_instance))
def __solve_type(
self,
dep_type: Type[Dependant],
visited: Set[Type[Dependant]],
initial_map: Dict[Type[Dependant], Dependant],
solution: List[Dependant],
) -> None:
if dep_type not in visited:
for dependency_type in dep_type.dependencies():
self.__solve_type(
dependency_type,
visited,
initial_map,
solution,
)
# If an instance of this type was given initially, use it instead
new_instance = None
if dep_type in initial_map:
new_instance = initial_map.get(dep_type)
else:
new_instance = dep_type(**self._kwargs)
solution.append(new_instance)
visited.add(dep_type)
class Processor():
"""Base processor class."""
def __init__(
self,
*handlers: EventHandler,
**kwargs,
) -> None:
"""
Constructor.
:param handlers: the `EventHandler`s to use for processing
:param kwargs: the parameters to pass on to new handlers
"""
expanded_handlers = self._expand_dependencies(*handlers, **kwargs)
self._handler_multimap = self._get_handler_maps(expanded_handlers)
self._register_with_handlers(expanded_handlers)
@staticmethod
def _expand_dependencies(
*handlers: EventHandler,
**kwargs,
) -> List[EventHandler]:
"""
Check handlers and add dependencies if not included.
:param handlers: the list of primary `EventHandler`s
:param kwargs: the parameters to pass on to new instances
"""
return DependencySolver(*handlers, **kwargs).solve()
@staticmethod
def _get_handler_maps(
handlers: List[EventHandler],
) -> HandlerMultimap:
"""
Collect and merge `HandlerMap`s from all events handlers into a `HandlerMultimap`.
:param handlers: the list of handlers
:return: the merged multimap
"""
handler_multimap = defaultdict(list)
for handler in handlers:
for event_name, handler in handler.handler_map.items():
handler_multimap[event_name].append(handler)
return handler_multimap
def _register_with_handlers(
self,
handlers: List[EventHandler],
) -> None:
"""
Register this processor with its `EventHandler`s.
:param handlers: the list of handlers
"""
for handler in handlers:
handler.register_processor(self)
def process(self, events: List[DictEvent]) -> None:
"""
Process all events.
:param events: the events to process
"""
for event in events:
self._process_event(event)
def _process_event(self, event: DictEvent) -> None:
"""Process a single event."""
event_name = get_event_name(event)
handler_functions = self._handler_multimap.get(event_name, None)
if handler_functions is not None:
for handler_function in handler_functions:
timestamp = get_field(event, '_timestamp')
cpu_id = get_field(event, 'cpu_id')
# TODO perhaps validate fields depending on the type of event,
# i.e. all UST events should have procname, (v)pid and (v)tid
# context info, since analyses might not work otherwise
procname = get_field(event, 'procname', raise_if_not_found=False)
pid = get_field(
event,
'vpid',
default=get_field(
event,
'pid',
raise_if_not_found=False),
raise_if_not_found=False)
tid = get_field(
event,
'vtid',
default=get_field(
event,
'tid',
raise_if_not_found=False),
raise_if_not_found=False)
metadata = EventMetadata(event_name, timestamp, cpu_id, procname, pid, tid)
handler_function(event, metadata)

View file

@ -0,0 +1,67 @@
# Copyright 2019 Robert Bosch GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for CPU time events processing."""
from typing import Dict
from tracetools_read import get_field
from . import EventHandler
from . import EventMetadata
from ..data_model.cpu_time import CpuTimeDataModel
class CpuTimeHandler(EventHandler):
"""
Handler that extracts data for CPU time.
It extracts timestamps from sched_switch events to later compute CPU time per thread.
"""
def __init__(
self,
**kwargs,
) -> None:
# Link event to handling method
handler_map = {
'sched_switch':
self._handle_sched_switch,
}
super().__init__(handler_map=handler_map, **kwargs)
self._data = CpuTimeDataModel()
# Temporary buffers
# cpu_id -> start timestamp of the running thread
self._cpu_start: Dict[int, int] = {}
def get_data_model(self) -> CpuTimeDataModel:
return self._data
def _handle_sched_switch(
self, event: Dict, metadata: EventMetadata
) -> None:
timestamp = metadata.timestamp
cpu_id = metadata.cpu_id
# Process if there is a previous thread timestamp
# TODO instead of discarding it, use first ever timestamp
# of the trace (with TraceCollection.timestamp_begin)
prev_timestamp = self._cpu_start.get(cpu_id, None)
if prev_timestamp is not None:
prev_tid = get_field(event, 'prev_tid')
duration = timestamp - prev_timestamp
self._data.add_duration(prev_tid, prev_timestamp, duration, cpu_id)
# Set start timestamp of next thread
self._cpu_start[cpu_id] = timestamp

View file

@ -0,0 +1,152 @@
# Copyright 2019 Robert Bosch GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for profile events processing."""
from collections import defaultdict
from typing import Dict
from typing import List
from typing import Union
from tracetools_read import get_field
from . import EventHandler
from . import EventMetadata
from ..data_model.profile import ProfileDataModel
class ProfileHandler(EventHandler):
"""
Handler that extracts profiling information.
It uses the following events:
* lttng_ust_cyg_profile_fast:func_entry
* lttng_ust_cyg_profile_fast:func_exit
* sched_switch
"""
def __init__(
self,
**kwargs,
) -> None:
handler_map = {
'lttng_ust_cyg_profile_fast:func_entry':
self._handle_function_entry,
'lttng_ust_cyg_profile_fast:func_exit':
self._handle_function_exit,
'sched_switch':
self._handle_sched_switch,
}
super().__init__(handler_map=handler_map, **kwargs)
self._data = ProfileDataModel()
# Temporary buffers
# tid ->
# list:[
# functions currently executing (ordered by relative depth), with info:
# [
# function name,
# start timestamp,
# last execution start timestamp of the function,
# total duration,
# ]
# ]
self._current_funcs: Dict[int, List[List[Union[str, int]]]] = defaultdict(list)
def get_data_model(self) -> ProfileDataModel:
return self._data
def _handle_sched_switch(
self, event: Dict, metadata: EventMetadata
) -> None:
timestamp = metadata.timestamp
# If function(s) currently running stop(s) executing
prev_tid = get_field(event, 'prev_tid')
if prev_tid in self._current_funcs:
# Increment durations using last start timestamp
for info in self._current_funcs.get(prev_tid):
last_start = info[2]
total_duration = info[3]
total_duration += timestamp - last_start
info[2] = None
info[3] = total_duration
# If stopped function(s) start(s) executing again
next_tid = get_field(event, 'next_tid')
if next_tid in self._current_funcs:
# Set last start timestamp to now
for info in self._current_funcs.get(next_tid):
assert info[2] is None
info[2] = timestamp
def _handle_function_entry(
self, event: Dict, metadata: EventMetadata
) -> None:
function_name = self._get_function_name(event)
# Push function data to stack, setting both timestamps to now
self._current_funcs[metadata.tid].append([
function_name,
metadata.timestamp,
metadata.timestamp,
0,
])
def _handle_function_exit(
self, event: Dict, metadata: EventMetadata
) -> None:
# Pop function data from stack
tid = metadata.tid
tid_functions = self._current_funcs[tid]
function_depth = len(tid_functions) - 1
info = tid_functions.pop()
function_name = info[0]
start_timestamp = info[1]
last_start_timestamp = info[2]
total_duration = info[3]
# Add to data model
parent_name = tid_functions[-1][0] if function_depth > 0 else None
duration = metadata.timestamp - start_timestamp
actual_duration = (metadata.timestamp - last_start_timestamp) + total_duration
self._data.add_duration(
tid,
function_depth,
function_name,
parent_name,
start_timestamp,
duration,
actual_duration,
)
def _get_function_name(
self, event: Dict
) -> str:
address = get_field(event, 'addr')
resolution = self._resolve_function_address(address)
if resolution is None:
resolution = str(address)
return resolution
def _resolve_function_address(
self, address: int
) -> Union[str, None]:
# TODO get debug_info from babeltrace for
# lttng_ust_cyg_profile_fast:func_entry events
# (or resolve { address -> function } name another way)
address_to_func = {
int('0x7F3418EC4DB4', 16): 'get_next_ready_executable',
int('0x7F3418EC3C54', 16): 'wait_for_work',
int('0x7F3418EE50F8', 16): 'collect_entities',
}
return address_to_func.get(address, None)

View file

@ -15,23 +15,25 @@
"""Module for trace events processor and ROS model creation."""
from typing import Dict
from typing import List
from tracetools_read.utils import get_field
from tracetools_read import get_field
from .data_model import DataModel
from .handler import EventHandler
from .lttng_models import EventMetadata
from . import EventHandler
from . import EventMetadata
from ..data_model.ros import RosDataModel
class Ros2Processor(EventHandler):
class Ros2Handler(EventHandler):
"""
ROS 2-aware event processing/handling class implementation.
ROS 2-aware event handling class implementation.
Handles a trace's events and builds a model with the data.
"""
def __init__(self) -> None:
def __init__(
self,
**kwargs,
) -> None:
# Link a ROS trace event to its corresponding handling method
handler_map = {
'ros2:rcl_init':
@ -61,14 +63,14 @@ class Ros2Processor(EventHandler):
'ros2:callback_end':
self._handle_callback_end,
}
super().__init__(handler_map)
super().__init__(handler_map=handler_map, **kwargs)
self._data = DataModel()
self._data = RosDataModel()
# Temporary buffers
self._callback_instances = {}
def get_data_model(self) -> DataModel:
def get_data_model(self) -> RosDataModel:
return self._data
def _handle_rcl_init(
@ -198,15 +200,3 @@ class Ros2Processor(EventHandler):
bool(is_intra_process))
else:
print(f'No matching callback start for callback object "{callback_object}"')
def ros2_process(events: List[Dict[str, str]]) -> Ros2Processor:
"""
Process unpickled events and create ROS 2 model.
:param events: the list of events
:return: the processor object
"""
processor = Ros2Processor()
processor.handle_events(events)
return processor

View file

@ -12,32 +12,118 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for data model utility class."""
"""Module for data model utility classes."""
from collections import defaultdict
from datetime import datetime as dt
from typing import Any
from typing import Dict
from typing import List
from typing import Mapping
from typing import Set
from typing import Union
from pandas import DataFrame
from .data_model import DataModel
from .data_model.cpu_time import CpuTimeDataModel
from .data_model.profile import ProfileDataModel
from .data_model.ros import RosDataModel
class DataModelUtil():
class ProfileDataModelUtil():
"""
Data model utility class.
Profiling data model utility class.
Provides functions to get info on a data model.
Provides functions to get more info on the data.
"""
def __init__(self, data_model: DataModel) -> None:
def __init__(self, data_model: ProfileDataModel) -> None:
"""
Constructor.
:param data_model: the data model object to use
"""
self._data = data_model
self.data = data_model
def with_tid(self, tid: int) -> DataFrame:
return self.data.times.loc[self.data.times['tid'] == tid]
def get_tids(self) -> Set[int]:
"""Get the TIDs in the data model."""
return set(self.data.times['tid'])
def get_call_tree(self, tid: int) -> Dict[str, List[str]]:
depth_names = self.with_tid(tid)[
['depth', 'function_name', 'parent_name']
].drop_duplicates()
# print(depth_names.to_string())
tree = defaultdict(set)
for _, row in depth_names.iterrows():
depth = row['depth']
name = row['function_name']
parent = row['parent_name']
if depth == 0:
tree[name]
else:
tree[parent].add(name)
return dict(tree)
def get_function_duration_data(self, tid: int) -> List[Dict[str, Union[int, str, DataFrame]]]:
"""Get duration data for each function."""
tid_df = self.with_tid(tid)
depth_names = tid_df[['depth', 'function_name', 'parent_name']].drop_duplicates()
functions_data = []
for _, row in depth_names.iterrows():
depth = row['depth']
name = row['function_name']
parent = row['parent_name']
data = tid_df.loc[
(tid_df['depth'] == depth) &
(tid_df['function_name'] == name)
][['start_timestamp', 'duration', 'actual_duration']]
functions_data.append({
'depth': depth,
'function_name': name,
'parent_name': parent,
'data': data,
})
return functions_data
class CpuTimeDataModelUtil():
"""
CPU time data model utility class.
Provides functions to get info on a CPU time data model.
"""
def __init__(self, data_model: CpuTimeDataModel) -> None:
"""
Constructor.
:param data_model: the data model object to use
"""
self.data = data_model
def get_time_per_thread(self) -> DataFrame:
"""Get a DataFrame of total duration for each thread."""
return self.data.times.loc[:, ['tid', 'duration']].groupby(by='tid').sum()
class RosDataModelUtil():
"""
ROS data model utility class.
Provides functions to get info on a ROS data model.
"""
def __init__(self, data_model: RosDataModel) -> None:
"""
Constructor.
:param data_model: the data model object to use
"""
self.data = data_model
def _prettify(self, original: str) -> str:
"""
@ -100,8 +186,8 @@ class DataModelUtil():
:return: the map
"""
callback_instances = self._data.callback_instances
callback_symbols = self._data.callback_symbols
callback_instances = self.data.callback_instances
callback_symbols = self.data.callback_symbols
# Get a list of callback objects
callback_objects = set(callback_instances['callback_object'])
@ -120,8 +206,8 @@ class DataModelUtil():
:return: a dataframe containing the start timestamp (datetime)
and duration (ms) of all callback instances for that object
"""
data = self._data.callback_instances.loc[
self._data.callback_instances.loc[:, 'callback_object'] == callback_obj,
data = self.data.callback_instances.loc[
self.data.callback_instances.loc[:, 'callback_object'] == callback_obj,
['timestamp', 'duration']
]
# Transform both columns to ms
@ -132,6 +218,35 @@ class DataModelUtil():
data['timestamp'] = data['timestamp'].apply(lambda t: dt.fromtimestamp(t / 1000.0))
return data
def get_node_tid_from_name(
self, node_name: str
) -> Union[int, None]:
"""
Get the tid corresponding to a node.
:param node_name: the name of the node
:return: the tid, or `None` if not found
"""
# Assuming there is only one row with the given name
node_row = self.data.nodes.loc[
self.data.nodes['name'] == node_name
]
assert len(node_row.index) <= 1, 'more than 1 node found'
return node_row.iloc[0]['tid'] if not node_row.empty else None
def get_node_names_from_tid(
self, tid: str
) -> Union[List[str], None]:
"""
Get the list of node names corresponding to a tid.
:param tid: the tid
:return: the list of node names, or `None` if not found
"""
return self.data.nodes[
self.data.nodes['tid'] == tid
]['name'].tolist()
def get_callback_owner_info(
self, callback_obj: int
) -> Union[str, None]:
@ -147,26 +262,26 @@ class DataModelUtil():
:return: information about the owner of the callback, or `None` if it fails
"""
# Get handle corresponding to callback object
handle = self._data.callback_objects.loc[
self._data.callback_objects['callback_object'] == callback_obj
handle = self.data.callback_objects.loc[
self.data.callback_objects['callback_object'] == callback_obj
].index.values.astype(int)[0]
type_name = None
info = None
# Check if it's a timer first (since it's slightly different than the others)
if handle in self._data.timers.index:
if handle in self.data.timers.index:
type_name = 'Timer'
info = self.get_timer_handle_info(handle)
elif handle in self._data.publishers.index:
elif handle in self.data.publishers.index:
type_name = 'Publisher'
info = self.get_publisher_handle_info(handle)
elif handle in self._data.subscriptions.index:
elif handle in self.data.subscriptions.index:
type_name = 'Subscription'
info = self.get_subscription_handle_info(handle)
elif handle in self._data.services.index:
elif handle in self.data.services.index:
type_name = 'Service'
info = self.get_subscription_handle_info(handle)
elif handle in self._data.clients.index:
elif handle in self.data.clients.index:
type_name = 'Client'
info = self.get_client_handle_info(handle)
@ -184,11 +299,11 @@ class DataModelUtil():
:return: a dictionary with name:value info, or `None` if it fails
"""
# TODO find a way to link a timer to a specific node
if timer_handle not in self._data.timers.index:
if timer_handle not in self.data.timers.index:
return None
tid = self._data.timers.loc[timer_handle, 'tid']
period_ns = self._data.timers.loc[timer_handle, 'period']
tid = self.data.timers.loc[timer_handle, 'tid']
period_ns = self.data.timers.loc[timer_handle, 'period']
period_ms = period_ns / 1000000.0
return {'tid': tid, 'period': f'{period_ms:.0f} ms'}
@ -201,12 +316,12 @@ class DataModelUtil():
:param publisher_handle: the publisher handle value
:return: a dictionary with name:value info, or `None` if it fails
"""
if publisher_handle not in self._data.publishers.index:
if publisher_handle not in self.data.publishers.index:
return None
node_handle = self._data.publishers.loc[publisher_handle, 'node_handle']
node_handle = self.data.publishers.loc[publisher_handle, 'node_handle']
node_handle_info = self.get_node_handle_info(node_handle)
topic_name = self._data.publishers.loc[publisher_handle, 'topic_name']
topic_name = self.data.publishers.loc[publisher_handle, 'topic_name']
publisher_info = {'topic': topic_name}
return {**node_handle_info, **publisher_info}
@ -219,11 +334,11 @@ class DataModelUtil():
:param subscription_handle: the subscription handle value
:return: a dictionary with name:value info, or `None` if it fails
"""
subscriptions_info = self._data.subscriptions.merge(
self._data.nodes,
subscriptions_info = self.data.subscriptions.merge(
self.data.nodes,
left_on='node_handle',
right_index=True)
if subscription_handle not in self._data.subscriptions.index:
if subscription_handle not in self.data.subscriptions.index:
return None
node_handle = subscriptions_info.loc[subscription_handle, 'node_handle']
@ -241,12 +356,12 @@ class DataModelUtil():
:param service_handle: the service handle value
:return: a dictionary with name:value info, or `None` if it fails
"""
if service_handle not in self._data.services:
if service_handle not in self.data.services:
return None
node_handle = self._data.services.loc[service_handle, 'node_handle']
node_handle = self.data.services.loc[service_handle, 'node_handle']
node_handle_info = self.get_node_handle_info(node_handle)
service_name = self._data.services.loc[service_handle, 'service_name']
service_name = self.data.services.loc[service_handle, 'service_name']
service_info = {'service': service_name}
return {**node_handle_info, **service_info}
@ -259,12 +374,12 @@ class DataModelUtil():
:param client_handle: the client handle value
:return: a dictionary with name:value info, or `None` if it fails
"""
if client_handle not in self._data.clients:
if client_handle not in self.data.clients:
return None
node_handle = self._data.clients.loc[client_handle, 'node_handle']
node_handle = self.data.clients.loc[client_handle, 'node_handle']
node_handle_info = self.get_node_handle_info(node_handle)
service_name = self._data.clients.loc[client_handle, 'service_name']
service_name = self.data.clients.loc[client_handle, 'service_name']
service_info = {'service': service_name}
return {**node_handle_info, **service_info}
@ -277,11 +392,11 @@ class DataModelUtil():
:param node_handle: the node handle value
:return: a dictionary with name:value info, or `None` if it fails
"""
if node_handle not in self._data.nodes.index:
if node_handle not in self.data.nodes.index:
return None
node_name = self._data.nodes.loc[node_handle, 'name']
tid = self._data.nodes.loc[node_handle, 'tid']
node_name = self.data.nodes.loc[node_handle, 'name']
tid = self.data.nodes.loc[node_handle, 'tid']
return {'node': node_name, 'tid': tid}
def format_info_dict(self, info_dict: Mapping[str, Any]) -> str: