Merge branch 'setup-mypy' into 'master'

Add mypy tests through ament_mypy and fix typing

See merge request micro-ROS/ros_tracing/tracetools_analysis!55
This commit is contained in:
Christophe Bedard 2020-03-08 20:34:34 +00:00
commit 4bc1e6ed1f
26 changed files with 249 additions and 121 deletions

View file

@ -13,6 +13,7 @@
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_mypy</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>ament_xmllint</test_depend>
<test_depend>python3-pytest</test_depend>

View file

@ -0,0 +1,22 @@
# Copyright 2019 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_mypy.main import main
import pytest
@pytest.mark.mypy
@pytest.mark.linter
def test_mypy():
assert main(argv=[]) == 0, 'Found errors'

View file

@ -15,6 +15,7 @@
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_mypy</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>ament_xmllint</test_depend>
<test_depend>python3-pytest</test_depend>

View file

@ -0,0 +1,22 @@
# Copyright 2019 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_mypy.main import main
import pytest
@pytest.mark.mypy
@pytest.mark.linter
def test_mypy():
assert main(argv=[]) == 0, 'Found errors'

View file

@ -13,12 +13,13 @@
# limitations under the License.
from typing import Dict
from typing import List
from typing import Set
import unittest
from tracetools_analysis.processor import AutoProcessor
from tracetools_analysis.processor import EventHandler
from tracetools_analysis.processor import EventMetadata
from tracetools_analysis.processor import HandlerMap
class AbstractEventHandler(EventHandler):
@ -32,18 +33,18 @@ class AbstractEventHandler(EventHandler):
class SubSubEventHandler(AbstractEventHandler):
def __init__(self) -> None:
handler_map = {
handler_map: HandlerMap = {
'myeventname': self._handler_whatever,
'myeventname69': self._handler_whatever,
}
super().__init__(handler_map=handler_map)
@staticmethod
def required_events() -> List[str]:
return [
def required_events() -> Set[str]:
return {
'myeventname',
'myeventname69',
]
}
def _handler_whatever(
self, event: Dict, metadata: EventMetadata
@ -54,16 +55,16 @@ class SubSubEventHandler(AbstractEventHandler):
class SubSubEventHandler2(AbstractEventHandler):
def __init__(self) -> None:
handler_map = {
handler_map: HandlerMap = {
'myeventname2': self._handler_whatever,
}
super().__init__(handler_map=handler_map)
@staticmethod
def required_events() -> List[str]:
return [
def required_events() -> Set[str]:
return {
'myeventname2',
]
}
def _handler_whatever(
self, event: Dict, metadata: EventMetadata
@ -74,16 +75,16 @@ class SubSubEventHandler2(AbstractEventHandler):
class SubEventHandler(EventHandler):
def __init__(self) -> None:
handler_map = {
handler_map: HandlerMap = {
'myeventname3': self._handler_whatever,
}
super().__init__(handler_map=handler_map)
@staticmethod
def required_events() -> List[str]:
return [
def required_events() -> Set[str]:
return {
'myeventname3',
]
}
def _handler_whatever(
self, event: Dict, metadata: EventMetadata

View file

@ -23,6 +23,7 @@ from pandas.util.testing import assert_frame_equal
from tracetools_analysis.data_model import DataModel
from tracetools_analysis.processor import EventHandler
from tracetools_analysis.processor import EventMetadata
from tracetools_analysis.processor import HandlerMap
from tracetools_analysis.utils import DataModelUtil
@ -113,13 +114,13 @@ class TestDataModelUtil(unittest.TestCase):
)
assert_frame_equal(input_df, expected_df)
def test_creation(self) -> None:
def handler_whatever(
self, event: Dict, metadata: EventMetadata
) -> None:
pass
handler_map = {'fake': handler_whatever}
def test_creation(self) -> None:
handler_map: HandlerMap = {'fake': self.handler_whatever}
data_model = DataModel()
# Should handle the event handler not having any data model

View file

@ -111,7 +111,7 @@ class TestDependencySolver(unittest.TestCase):
# Pass parameter and check that the new instance has it
solution = DependencySolver(depone_instance, myparam='myvalue').solve()
self.assertEqual(len(solution), 2, 'solution length invalid')
self.assertEqual(solution[0].myparam, 'myvalue', 'parameter not passed on')
self.assertEqual(solution[0].myparam, 'myvalue', 'parameter not passed on') # type: ignore
if __name__ == '__main__':

View file

@ -73,10 +73,10 @@ class TestProcessCommand(unittest.TestCase):
# Should fail to find converted file under directory
file_path, create_file = inspect_input_path(self.without_converted_file_dir, False)
self.assertIsNone(file_path)
self.assertIsNone(create_file)
self.assertFalse(create_file)
file_path, create_file = inspect_input_path(self.without_converted_file_dir, True)
self.assertIsNone(file_path)
self.assertIsNone(create_file)
self.assertFalse(create_file)
# Should accept any file path if it exists
file_path, create_file = inspect_input_path(self.random_file_path, False)

View file

@ -13,18 +13,19 @@
# limitations under the License.
from typing import Dict
from typing import List
from typing import Set
import unittest
from tracetools_analysis.processor import EventHandler
from tracetools_analysis.processor import EventMetadata
from tracetools_analysis.processor import HandlerMap
from tracetools_analysis.processor import Processor
class StubHandler1(EventHandler):
def __init__(self) -> None:
handler_map = {
handler_map: HandlerMap = {
'myeventname': self._handler_whatever,
}
super().__init__(handler_map=handler_map)
@ -39,7 +40,7 @@ class StubHandler1(EventHandler):
class StubHandler2(EventHandler):
def __init__(self) -> None:
handler_map = {
handler_map: HandlerMap = {
'myeventname': self._handler_whatever,
}
super().__init__(handler_map=handler_map)
@ -54,8 +55,8 @@ class StubHandler2(EventHandler):
class WrongHandler(EventHandler):
def __init__(self) -> None:
handler_map = {
'myeventname': self._handler_wrong,
handler_map: HandlerMap = {
'myeventname': self._handler_wrong, # type: ignore # intentionally wrong
}
super().__init__(handler_map=handler_map)
@ -68,17 +69,17 @@ class WrongHandler(EventHandler):
class MissingEventHandler(EventHandler):
def __init__(self) -> None:
handler_map = {
handler_map: HandlerMap = {
'myeventname': self._handler_whatever,
}
super().__init__(handler_map=handler_map)
@staticmethod
def required_events() -> List[str]:
return [
def required_events() -> Set[str]:
return {
'no_handler_for_this',
'myeventname',
]
}
def _handler_whatever(
self, event: Dict, metadata: EventMetadata
@ -89,16 +90,16 @@ class MissingEventHandler(EventHandler):
class EventHandlerWithRequiredEvent(EventHandler):
def __init__(self) -> None:
handler_map = {
handler_map: HandlerMap = {
'myrequiredevent': self._handler_whatever,
}
super().__init__(handler_map=handler_map)
@staticmethod
def required_events() -> List[str]:
return [
def required_events() -> Set[str]:
return {
'myrequiredevent',
]
}
def _handler_whatever(
self, event: Dict, metadata: EventMetadata

View file

@ -316,8 +316,8 @@ class TestProfileHandler(unittest.TestCase):
cls.processor.process(input_events)
def test_profiling(self) -> None:
handler = self.__class__.handler
expected_df = self.__class__.expected
handler = self.__class__.handler # type: ignore
expected_df = self.__class__.expected # type: ignore
result_df = handler.data.times
assert_frame_equal(result_df, expected_df)

View file

@ -19,7 +19,6 @@ import argparse
import os
import sys
import time
from typing import Optional
from tracetools_analysis.conversion import ctf
@ -50,7 +49,7 @@ def parse_args():
def convert(
trace_directory: str,
output_file_name: str = DEFAULT_CONVERT_FILE_NAME,
) -> Optional[int]:
) -> int:
"""
Convert trace directory to a file.
@ -71,6 +70,7 @@ def convert(
time_diff = time.time() - start_time
print(f'converted {count} events in {time_diff_to_str(time_diff)}')
print(f'output written to: {output_file_path}')
return 0
def main():

View file

@ -28,4 +28,4 @@ class DataModel():
def print_data(self) -> None:
"""Print the data model."""
return None
raise NotImplementedError

View file

@ -52,5 +52,5 @@ class MemoryUsageDataModel(DataModel):
def print_data(self) -> None:
print('==================MEMORY USAGE DATA MODEL==================')
tail = 20
print(f'Memory difference (tail={tail}):\n{self.times.tail(tail).to_string()}')
print(f'Memory difference (tail={tail}):\n{self.memory_diff.tail(tail).to_string()}')
print('===========================================================')

View file

@ -14,6 +14,8 @@
"""Module for profile data model."""
from typing import Optional
import pandas as pd
from . import DataModel
@ -45,7 +47,7 @@ class ProfileDataModel(DataModel):
tid: int,
depth: int,
function_name: str,
parent_name: str,
parent_name: Optional[str],
start_timestamp: int,
duration: int,
actual_duration: int,

View file

@ -19,6 +19,7 @@ import pickle
import sys
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from tracetools_read.trace import is_trace_directory
@ -30,7 +31,7 @@ from ..convert import DEFAULT_CONVERT_FILE_NAME
def inspect_input_path(
input_path: str,
force_conversion: bool = False,
) -> Tuple[str, bool]:
) -> Tuple[Optional[str], bool]:
"""
Check input path for a converted file or a trace directory.
@ -75,7 +76,7 @@ def inspect_input_path(
print(
f'cannot find either a trace directory or a converted file: {input_directory}',
file=sys.stderr)
return None, None
return None, False
else:
converted_file_path = input_path
if force_conversion:
@ -91,12 +92,13 @@ def inspect_input_path(
def convert_if_needed(
input_path: str,
force_conversion: bool = False,
) -> str:
) -> Optional[str]:
"""
Inspect input path and convert trace directory to file if necessary.
:param input_path: the path to a converted file or trace directory
:param force_conversion: whether to re-create converted file even if it is found
:return: the path to the converted file, or `None` if it failed
"""
converted_file_path, create_converted_file = inspect_input_path(input_path, force_conversion)
@ -130,6 +132,9 @@ def load_file(
else:
file_path = input_path
if file_path is None:
raise RuntimeError(f'could not use input path: {input_path}')
events = []
with open(os.path.expanduser(file_path), 'rb') as f:
p = pickle.Unpickler(f)

View file

@ -19,7 +19,6 @@ import argparse
import os
import sys
import time
from typing import Optional
from tracetools_analysis.loading import load_file
from tracetools_analysis.processor import Processor
@ -54,7 +53,7 @@ def process(
input_path: str,
force_conversion: bool = False,
hide_results: bool = False,
) -> Optional[int]:
) -> int:
"""
Process converted trace file.
@ -77,6 +76,7 @@ def process(
if not hide_results:
processor.print_data()
print(f'processed {len(events)} events in {time_diff_to_str(time_diff)}')
return 0
def main():

View file

@ -16,9 +16,11 @@
from collections import defaultdict
import sys
from types import ModuleType
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Type
from typing import Union
@ -80,8 +82,9 @@ class EventMetadata():
return self._tid
HandlerMap = Dict[str, Callable[[DictEvent, EventMetadata], None]]
HandlerMultimap = Dict[str, List[Callable[[DictEvent, EventMetadata], None]]]
HandlerMethod = Callable[[DictEvent, EventMetadata], None]
HandlerMap = Dict[str, HandlerMethod]
HandlerMultimap = Dict[str, List[HandlerMethod]]
class Dependant():
@ -92,6 +95,12 @@ class Dependant():
Dependencies are type-related only.
"""
def __init__(
self,
**kwargs,
) -> None:
pass
@staticmethod
def dependencies() -> List[Type['Dependant']]:
"""
@ -116,7 +125,7 @@ class EventHandler(Dependant):
self,
*,
handler_map: HandlerMap,
data_model: DataModel = None,
data_model: Optional[DataModel] = None,
**kwargs,
) -> None:
"""
@ -130,7 +139,7 @@ class EventHandler(Dependant):
assert all(required_name in handler_map.keys() for required_name in self.required_events())
self._handler_map = handler_map
self._data_model = data_model
self._processor = None
self._processor: Optional[Processor] = None
@property
def handler_map(self) -> HandlerMap:
@ -138,12 +147,12 @@ class EventHandler(Dependant):
return self._handler_map
@property
def data(self) -> Union[DataModel, None]:
def data(self) -> DataModel:
"""Get the data model."""
return self._data_model
return self._data_model # type: ignore
@property
def processor(self) -> 'Processor':
def processor(self) -> Optional['Processor']:
return self._processor
@staticmethod
@ -154,7 +163,7 @@ class EventHandler(Dependant):
Without these events, the EventHandler would be invalid/useless. Inheriting classes can
decide not to declare that they require specific events.
"""
return {}
return set()
def register_processor(
self,
@ -261,12 +270,11 @@ class DependencySolver():
solution,
)
# If an instance of this type was given initially, use it instead
new_instance = None
if dep_type in initial_map:
new_instance = initial_map.get(dep_type)
dep_type_instance = initial_map.get(dep_type, None)
if dep_type_instance is not None:
solution.append(dep_type_instance)
else:
new_instance = dep_type(**self._kwargs)
solution.append(new_instance)
solution.append(dep_type(**self._kwargs))
visited.add(dep_type)
@ -311,7 +319,7 @@ class Processor():
:param handlers: the list of primary `EventHandler`s
:param kwargs: the parameters to pass on to new instances
"""
return DependencySolver(*handlers, **kwargs).solve()
return DependencySolver(*handlers, **kwargs).solve() # type: ignore
@staticmethod
def _get_handler_maps(
@ -323,10 +331,10 @@ class Processor():
:param handlers: the list of handlers
:return: the merged multimap
"""
handler_multimap = defaultdict(list)
handler_multimap: HandlerMultimap = defaultdict(list)
for handler in handlers:
for event_name, handler in handler.handler_map.items():
handler_multimap[event_name].append(handler)
for event_name, handler_method in handler.handler_map.items():
handler_multimap[event_name].append(handler_method)
return handler_multimap
def _register_with_handlers(
@ -491,13 +499,13 @@ class AutoProcessor():
@staticmethod
def _get_applicable_event_handler_classes(
event_names: List[str],
handler_classes: List[Type[EventHandler]],
event_names: Set[str],
handler_classes: Set[Type[EventHandler]],
) -> Set[Type[EventHandler]]:
"""
Get applicable EventHandler subclasses for a list of event names.
Get applicable EventHandler subclasses for a set of event names.
:param event_names: the list of event names
:param event_names: the set of event names
:return: a list of EventHandler subclasses for which requirements are met
"""
return {
@ -542,14 +550,19 @@ class AutoProcessor():
@staticmethod
def _import_event_handler_submodules(
name: str = __name__,
recursive=True,
):
"""Force import of EventHandler submodules."""
recursive: bool = True,
) -> Dict[str, ModuleType]:
"""
Force import of EventHandler submodules.
:param name: the base module name
:param recursive: `True` if importing recursively, `False` otherwise
"""
import importlib
import pkgutil
package = importlib.import_module(name)
results = {}
for loader, name, is_pkg in pkgutil.walk_packages(package.__path__):
for loader, name, is_pkg in pkgutil.walk_packages(package.__path__): # type: ignore #1422
full_name = package.__name__ + '.' + name
results[full_name] = importlib.import_module(full_name)
if recursive and is_pkg:
@ -570,10 +583,10 @@ class ProcessingProgressDisplay():
:param processing_elements: the list of elements doing processing
"""
self.__info_string = '[' + ', '.join(processing_elements) + ']'
self.__total_work = None
self.__progress_count = None
self.__rolling_count = None
self.__work_display_period = None
self.__total_work: int = 0
self.__progress_count: int = 0
self.__rolling_count: int = 0
self.__work_display_period: int = 0
def set_work_total(
self,

View file

@ -21,6 +21,7 @@ from tracetools_read import get_field
from . import EventHandler
from . import EventMetadata
from . import HandlerMap
from ..data_model.cpu_time import CpuTimeDataModel
@ -37,7 +38,7 @@ class CpuTimeHandler(EventHandler):
) -> None:
"""Create a CpuTimeHandler."""
# Link event to handling method
handler_map = {
handler_map: HandlerMap = {
'sched_switch':
self._handle_sched_switch,
}
@ -57,6 +58,10 @@ class CpuTimeHandler(EventHandler):
'sched_switch',
}
@property
def data(self) -> CpuTimeDataModel:
return super().data # type: ignore
def _handle_sched_switch(
self, event: Dict, metadata: EventMetadata
) -> None:

View file

@ -21,6 +21,7 @@ from tracetools_read import get_field
from . import EventHandler
from . import EventMetadata
from . import HandlerMap
from ..data_model.memory_usage import MemoryUsageDataModel
@ -38,6 +39,10 @@ class MemoryUsageHandler(EventHandler):
**kwargs,
)
@property
def data(self) -> MemoryUsageDataModel:
return super().data # type: ignore
def _update(
self,
timestamp: int,
@ -72,7 +77,7 @@ class UserspaceMemoryUsageHandler(MemoryUsageHandler):
**kwargs,
) -> None:
# Link event to handling method
handler_map = {
handler_map: HandlerMap = {
'lttng_ust_libc:malloc':
self._handle_malloc,
'lttng_ust_libc:calloc':
@ -196,7 +201,7 @@ class KernelMemoryUsageHandler(MemoryUsageHandler):
**kwargs,
) -> None:
# Link event to handling method
handler_map = {
handler_map: HandlerMap = {
'kmem_mm_page_alloc':
self._handle_malloc,
'kmem_mm_page_free':

View file

@ -24,7 +24,7 @@ from tracetools_read import get_field
from . import EventHandler
from . import EventMetadata
from . import HandlerMap
from ..data_model.profile import ProfileDataModel
@ -56,7 +56,7 @@ class ProfileHandler(EventHandler):
:param address_to_func: the mapping from function address (`int` or hex `str`) to name
"""
handler_map = {
handler_map: HandlerMap = {
'lttng_ust_cyg_profile_fast:func_entry':
self._handle_function_entry,
'lttng_ust_cyg_profile_fast:func_exit':
@ -95,6 +95,10 @@ class ProfileHandler(EventHandler):
'sched_switch',
}
@property
def data(self) -> ProfileDataModel:
return super().data # type: ignore
@staticmethod
def addr_to_int(addr: Union[int, str]) -> int:
"""Transform an address into an `int` if it's a hex `str`."""
@ -106,20 +110,22 @@ class ProfileHandler(EventHandler):
timestamp = metadata.timestamp
# If function(s) currently running stop(s) executing
prev_tid = get_field(event, 'prev_tid')
if prev_tid in self._current_funcs:
prev_info_list = self._current_funcs.get(prev_tid, None)
if prev_info_list is not None:
# Increment durations using last start timestamp
for info in self._current_funcs.get(prev_tid):
for info in prev_info_list:
last_start = info[2]
total_duration = info[3]
total_duration += timestamp - last_start
info[2] = None
info[2] = -1
info[3] = total_duration
# If stopped function(s) start(s) executing again
next_tid = get_field(event, 'next_tid')
if next_tid in self._current_funcs:
next_info_list = self._current_funcs.get(next_tid, None)
if next_info_list is not None:
# Set last start timestamp to now
for info in self._current_funcs.get(next_tid):
assert info[2] is None
for info in next_info_list:
assert info[2] == -1
info[2] = timestamp
def _handle_function_entry(
@ -153,9 +159,9 @@ class ProfileHandler(EventHandler):
self.data.add_duration(
tid,
function_depth,
function_name,
parent_name,
start_timestamp,
function_name, # type: ignore
parent_name, # type: ignore
start_timestamp, # type: ignore
duration,
actual_duration,
)

View file

@ -16,11 +16,13 @@
from typing import Dict
from typing import Set
from typing import Tuple
from tracetools_read import get_field
from . import EventHandler
from . import EventMetadata
from . import HandlerMap
from ..data_model.ros2 import Ros2DataModel
@ -37,7 +39,7 @@ class Ros2Handler(EventHandler):
) -> None:
"""Create a Ros2Handler."""
# Link a ROS trace event to its corresponding handling method
handler_map = {
handler_map: HandlerMap = {
'ros2:rcl_init':
self._handle_rcl_init,
'ros2:rcl_node_init':
@ -74,7 +76,7 @@ class Ros2Handler(EventHandler):
)
# Temporary buffers
self._callback_instances = {}
self._callback_instances: Dict[int, Tuple[Dict, EventMetadata]] = {}
@staticmethod
def required_events() -> Set[str]:
@ -82,6 +84,10 @@ class Ros2Handler(EventHandler):
'ros2:rcl_init',
}
@property
def data(self) -> Ros2DataModel:
return super().data # type: ignore
def _handle_rcl_init(
self, event: Dict, metadata: EventMetadata,
) -> None:
@ -207,8 +213,9 @@ class Ros2Handler(EventHandler):
) -> None:
# Fetch from dict
callback_object = get_field(event, 'callback')
(event_start, metadata_start) = self._callback_instances.get(callback_object)
if event_start is not None and metadata_start is not None:
callback_instance_data = self._callback_instances.get(callback_object)
if callback_instance_data is not None:
(event_start, metadata_start) = callback_instance_data
del self._callback_instances[callback_object]
duration = metadata.timestamp - metadata_start.timestamp
is_intra_process = get_field(event_start, 'is_intra_process', raise_if_not_found=False)

View file

@ -16,6 +16,7 @@
from datetime import datetime as dt
from typing import List
from typing import Optional
from typing import Union
from pandas import DataFrame
@ -43,7 +44,7 @@ class DataModelUtil():
self.__data = data_object.data if isinstance(data_object, EventHandler) else data_object
@property
def data(self) -> Union[DataModel, None]:
def data(self) -> Optional[DataModel]:
return self.__data
@staticmethod

View file

@ -37,6 +37,10 @@ class CpuTimeDataModelUtil(DataModelUtil):
"""
super().__init__(data_object)
@property
def data(self) -> CpuTimeDataModel:
return super().data # type: ignore
def get_time_per_thread(self) -> DataFrame:
"""Get a DataFrame of total duration for each thread."""
return self.data.times.loc[:, ['tid', 'duration']].groupby(by='tid').sum()

View file

@ -16,6 +16,8 @@
from collections import defaultdict
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from pandas import DataFrame
@ -66,12 +68,13 @@ class MemoryUsageDataModelUtil(DataModelUtil):
"""
suffixes = ['B', 'KB', 'MB', 'GB', 'TB']
suffixIndex = 0
while size > 1024 and suffixIndex < 4:
mem_size = float(size)
while mem_size > 1024.0 and suffixIndex < 4:
# Increment the index of the suffix
suffixIndex += 1
# Apply the division
size = size / 1024.0
return f'{size:.{precision}f} {suffixes[suffixIndex]}'
mem_size = mem_size / 1024.0
return f'{mem_size:.{precision}f} {suffixes[suffixIndex]}'
def get_max_memory_usage_per_tid(self) -> DataFrame:
"""
@ -79,50 +82,61 @@ class MemoryUsageDataModelUtil(DataModelUtil):
:return dataframe with maximum memory usage (userspace & kernel) per tid
"""
if self.data_ust is not None:
tids_ust = None
tids_kernel = None
ust_memory_usage_dfs = self.get_absolute_userspace_memory_usage_by_tid()
if ust_memory_usage_dfs is not None:
tids_ust = set(ust_memory_usage_dfs.keys())
if self.data_kernel is not None:
kernel_memory_usage_dfs = self.get_absolute_kernel_memory_usage_by_tid()
if kernel_memory_usage_dfs is not None:
tids_kernel = set(kernel_memory_usage_dfs.keys())
# Use only the userspace tid values if available, otherwise use the kernel tid values
tids = tids_ust if self.data_ust is not None else tids_kernel
tids = tids_ust or tids_kernel
# Should not happen, since it is checked in __init__
if tids is None:
raise RuntimeError('no data')
data = [
[
tid,
self.format_size(ust_memory_usage_dfs[tid]['memory_usage'].max(), precision=1)
if self.data_ust is not None
if ust_memory_usage_dfs is not None
and ust_memory_usage_dfs.get(tid) is not None
else None,
self.format_size(kernel_memory_usage_dfs[tid]['memory_usage'].max(), precision=1)
if self.data_kernel is not None and ust_memory_usage_dfs.get(tid) is not None
if kernel_memory_usage_dfs is not None
and kernel_memory_usage_dfs.get(tid) is not None
else None,
]
for tid in tids
]
return DataFrame(data, columns=['tid', 'max_memory_usage_ust', 'max_memory_usage_kernel'])
def get_absolute_userspace_memory_usage_by_tid(self) -> Dict[int, DataFrame]:
def get_absolute_userspace_memory_usage_by_tid(self) -> Optional[Dict[int, DataFrame]]:
"""
Get absolute userspace memory usage over time per tid.
:return (tid -> DataFrame of absolute memory usage over time)
"""
if self.data_ust is None:
return None
return self._get_absolute_memory_usage_by_tid(self.data_ust)
def get_absolute_kernel_memory_usage_by_tid(self) -> Dict[int, DataFrame]:
def get_absolute_kernel_memory_usage_by_tid(self) -> Optional[Dict[int, DataFrame]]:
"""
Get absolute kernel memory usage over time per tid.
:return (tid -> DataFrame of absolute memory usage over time)
"""
if self.data_kernel is None:
return None
return self._get_absolute_memory_usage_by_tid(self.data_kernel)
def _get_absolute_memory_usage_by_tid(
self,
data_model: MemoryUsageDataModel,
) -> Dict[int, DataFrame]:
previous = defaultdict(int)
data = defaultdict(list)
previous: Dict[int, int] = defaultdict(int)
data: Dict[int, List[Dict[str, int]]] = defaultdict(list)
for index, row in data_model.memory_diff.iterrows():
timestamp = row['timestamp']
tid = int(row['tid'])

View file

@ -41,6 +41,10 @@ class ProfileDataModelUtil(DataModelUtil):
"""
super().__init__(data_object)
@property
def data(self) -> ProfileDataModel:
return super().data # type: ignore
def with_tid(
self,
tid: int,
@ -54,12 +58,12 @@ class ProfileDataModelUtil(DataModelUtil):
def get_call_tree(
self,
tid: int,
) -> Dict[str, List[str]]:
) -> Dict[str, Set[str]]:
depth_names = self.with_tid(tid)[
['depth', 'function_name', 'parent_name']
].drop_duplicates()
# print(depth_names.to_string())
tree = defaultdict(set)
tree: Dict[str, Set[str]] = defaultdict(set)
for _, row in depth_names.iterrows():
depth = row['depth']
name = row['function_name']

View file

@ -18,6 +18,7 @@
from typing import Any
from typing import List
from typing import Mapping
from typing import Optional
from typing import Union
from pandas import DataFrame
@ -41,6 +42,10 @@ class Ros2DataModelUtil(DataModelUtil):
"""
super().__init__(data_object)
@property
def data(self) -> Ros2DataModel:
return super().data # type: ignore
def _prettify(
self,
original: str,
@ -140,7 +145,7 @@ class Ros2DataModelUtil(DataModelUtil):
def get_node_tid_from_name(
self,
node_name: str,
) -> Union[int, None]:
) -> Optional[int]:
"""
Get the tid corresponding to a node.
@ -157,7 +162,7 @@ class Ros2DataModelUtil(DataModelUtil):
def get_node_names_from_tid(
self,
tid: str,
) -> Union[List[str], None]:
) -> Optional[List[str]]:
"""
Get the list of node names corresponding to a tid.
@ -171,7 +176,7 @@ class Ros2DataModelUtil(DataModelUtil):
def get_callback_owner_info(
self,
callback_obj: int,
) -> Union[str, None]:
) -> Optional[str]:
"""
Get information about the owner of a callback.
@ -207,9 +212,9 @@ class Ros2DataModelUtil(DataModelUtil):
type_name = 'Client'
info = self.get_client_handle_info(reference)
if info is not None:
info = f'{type_name} -- {self.format_info_dict(info)}'
return info
if info is None:
return None
return f'{type_name} -- {self.format_info_dict(info)}'
def get_timer_handle_info(
self,
@ -245,6 +250,8 @@ class Ros2DataModelUtil(DataModelUtil):
node_handle = self.data.publishers.loc[publisher_handle, 'node_handle']
node_handle_info = self.get_node_handle_info(node_handle)
if node_handle_info is None:
return None
topic_name = self.data.publishers.loc[publisher_handle, 'topic_name']
publisher_info = {'topic': topic_name}
return {**node_handle_info, **publisher_info}
@ -252,7 +259,7 @@ class Ros2DataModelUtil(DataModelUtil):
def get_subscription_reference_info(
self,
subscription_reference: int,
) -> Union[Mapping[str, Any], None]:
) -> Optional[Mapping[str, Any]]:
"""
Get information about a subscription handle.
@ -297,6 +304,8 @@ class Ros2DataModelUtil(DataModelUtil):
node_handle = subscriptions_info.loc[subscription_reference, 'node_handle']
node_handle_info = self.get_node_handle_info(node_handle)
if node_handle_info is None:
return None
topic_name = subscriptions_info.loc[subscription_reference, 'topic_name']
subscription_info = {'topic': topic_name}
return {**node_handle_info, **subscription_info}
@ -304,7 +313,7 @@ class Ros2DataModelUtil(DataModelUtil):
def get_service_handle_info(
self,
service_handle: int,
) -> Union[Mapping[str, Any], None]:
) -> Optional[Mapping[str, Any]]:
"""
Get information about a service handle.
@ -316,6 +325,8 @@ class Ros2DataModelUtil(DataModelUtil):
node_handle = self.data.services.loc[service_handle, 'node_handle']
node_handle_info = self.get_node_handle_info(node_handle)
if node_handle_info is None:
return None
service_name = self.data.services.loc[service_handle, 'service_name']
service_info = {'service': service_name}
return {**node_handle_info, **service_info}
@ -323,7 +334,7 @@ class Ros2DataModelUtil(DataModelUtil):
def get_client_handle_info(
self,
client_handle: int,
) -> Union[Mapping[str, Any], None]:
) -> Optional[Mapping[str, Any]]:
"""
Get information about a client handle.
@ -335,6 +346,8 @@ class Ros2DataModelUtil(DataModelUtil):
node_handle = self.data.clients.loc[client_handle, 'node_handle']
node_handle_info = self.get_node_handle_info(node_handle)
if node_handle_info is None:
return None
service_name = self.data.clients.loc[client_handle, 'service_name']
service_info = {'service': service_name}
return {**node_handle_info, **service_info}
@ -342,7 +355,7 @@ class Ros2DataModelUtil(DataModelUtil):
def get_node_handle_info(
self,
node_handle: int,
) -> Union[Mapping[str, Any], None]:
) -> Optional[Mapping[str, Any]]:
"""
Get information about a node handle.