Merge branch 'auto-processor' into 'master'

Add automatic processor

See merge request micro-ROS/ros_tracing/tracetools_analysis!40
This commit is contained in:
Christophe Bedard 2020-01-01 00:33:13 +00:00
commit d0229a0bed
8 changed files with 363 additions and 18 deletions

View file

@ -43,6 +43,7 @@ setup(
'console_scripts': [
f'convert = {package_name}.convert:main',
f'process = {package_name}.process:main',
f'auto = {package_name}.scripts.auto:main',
f'cb_durations = {package_name}.scripts.cb_durations:main',
f'memory_usage = {package_name}.scripts.memory_usage:main',
],

View file

@ -0,0 +1,180 @@
# Copyright 2019 Apex.AI, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict
from typing import List
import unittest
from tracetools_analysis.processor import AutoProcessor
from tracetools_analysis.processor import EventHandler
from tracetools_analysis.processor import EventMetadata
class AbstractEventHandler(EventHandler):
def __init__(self, **kwargs) -> None:
if type(self) is AbstractEventHandler:
raise RuntimeError()
super().__init__(**kwargs)
class SubSubEventHandler(AbstractEventHandler):
def __init__(self) -> None:
handler_map = {
'myeventname': self._handler_whatever,
'myeventname69': self._handler_whatever,
}
super().__init__(handler_map=handler_map)
@staticmethod
def required_events() -> List[str]:
return [
'myeventname',
'myeventname69',
]
def _handler_whatever(
self, event: Dict, metadata: EventMetadata
) -> None:
pass
class SubSubEventHandler2(AbstractEventHandler):
def __init__(self) -> None:
handler_map = {
'myeventname2': self._handler_whatever,
}
super().__init__(handler_map=handler_map)
@staticmethod
def required_events() -> List[str]:
return [
'myeventname2',
]
def _handler_whatever(
self, event: Dict, metadata: EventMetadata
) -> None:
pass
class SubEventHandler(EventHandler):
def __init__(self) -> None:
handler_map = {
'myeventname3': self._handler_whatever,
}
super().__init__(handler_map=handler_map)
@staticmethod
def required_events() -> List[str]:
return [
'myeventname3',
]
def _handler_whatever(
self, event: Dict, metadata: EventMetadata
) -> None:
pass
class TestAutoProcessor(unittest.TestCase):
def __init__(self, *args) -> None:
super().__init__(
*args,
)
def test_separate_methods(self) -> None:
# Testing logic/methods separately, since we don't actually want to process
# Getting subclasses
subclasses = AutoProcessor._get_subclasses(EventHandler)
# Will also contain the real classes
self.assertTrue(
all(
handler in subclasses
for handler in [
AbstractEventHandler,
SubSubEventHandler,
SubSubEventHandler2,
SubEventHandler,
]
)
)
# Finding applicable classes
event_names = {
'myeventname',
'myeventname2',
'myeventname3',
}
applicable_handler_classes = AutoProcessor._get_applicable_event_handler_classes(
event_names,
subclasses,
)
self.assertTrue(
all(
handler in applicable_handler_classes
for handler in [
AbstractEventHandler,
SubSubEventHandler2,
SubEventHandler,
]
) and
SubSubEventHandler not in applicable_handler_classes
)
# Creating instances
instances = AutoProcessor._get_event_handler_instances(applicable_handler_classes)
for instance in instances:
self.assertTrue(type(instance) is not AbstractEventHandler)
def test_all(self) -> None:
# Test the main method with all the logic
events = [
{
'_name': 'myeventname',
'_timestamp': 0,
'cpu_id': 0,
},
{
'_name': 'myeventname2',
'_timestamp': 69,
'cpu_id': 0,
},
{
'_name': 'myeventname3',
'_timestamp': 6969,
'cpu_id': 0,
},
]
instances = AutoProcessor.get_applicable_event_handlers(events)
for instance in instances:
self.assertTrue(type(instance) is not AbstractEventHandler)
# Will also contain the real classes
self.assertEqual(
sum(
isinstance(instance, handler_class)
for handler_class in [SubEventHandler, SubSubEventHandler2]
for instance in instances
),
2,
)
if __name__ == '__main__':
unittest.main()

View file

@ -138,14 +138,14 @@ class EventHandler(Dependant):
return None
@staticmethod
def required_events() -> List[str]:
def required_events() -> Set[str]:
"""
Get the list of events required by this EventHandler.
Get the set of events required by this EventHandler.
Without these events, the EventHandler would be invalid/useless. Inheriting classes can
decide not to declare that they require specific events.
"""
return []
return {}
def register_processor(self, processor: 'Processor') -> None:
"""Register processor with this `EventHandler` so that it can query other handlers."""
@ -272,6 +272,8 @@ class Processor():
:param kwargs: the parameters to pass on to new handlers
"""
self._initial_handlers = list(handlers)
if len(self._initial_handlers) == 0:
raise RuntimeError('Must provide at least one handler!')
self._expanded_handlers = self._expand_dependencies(*handlers, **kwargs)
self._handler_multimap = self._get_handler_maps(self._expanded_handlers)
self._register_with_handlers(self._expanded_handlers)
@ -321,11 +323,18 @@ class Processor():
for handler in handlers:
handler.register_processor(self)
@staticmethod
def get_event_names(
events: List[DictEvent],
) -> Set[str]:
"""Get set of names from a list of events."""
return {get_event_name(event) for event in events}
def _check_required_events(
self,
events: List[DictEvent],
) -> None:
event_names = {get_event_name(event) for event in events}
event_names = self.get_event_names(events)
# Check names separately so that we can know which event from which handler is missing
for handler in self._expanded_handlers:
for name in handler.required_events():
@ -396,6 +405,125 @@ class Processor():
handler.data.print_data()
class AutoProcessor():
"""
Automatic processor, which takes a list of events and enables all relevant handlers.
It checks each existing EventHandler, and, if its required events are in the events list, it
uses that handler.
"""
def __init__(
self,
events: List[DictEvent],
**kwargs,
) -> None:
"""
Create an AutoProcessor.
:param events: the list of events to process
:param kwargs: the kwargs to provide when instanciating EventHandler subclasses
"""
self.handlers = self.get_applicable_event_handlers(events)
Processor(
*self.handlers,
**kwargs,
).process(events)
def print_data(self) -> None:
"""Print data models of all handlers."""
for handler in self.handlers:
handler.data.print_data()
@staticmethod
def get_applicable_event_handlers(
events: List[DictEvent],
) -> List[EventHandler]:
"""
Get applicable EventHandler instances for a list of events.
:param events: the list of events
:return the concrete EventHandler instances which are applicable
"""
event_names = Processor.get_event_names(events)
# Force import of all processor submodules (i.e. files) so that we can find all
# EventHandler subclasses
AutoProcessor._import_event_handler_submodules()
all_handler_classes = AutoProcessor._get_subclasses(EventHandler)
applicable_handler_classes = AutoProcessor._get_applicable_event_handler_classes(
event_names,
all_handler_classes,
)
return AutoProcessor._get_event_handler_instances(applicable_handler_classes)
@staticmethod
def _get_applicable_event_handler_classes(
event_names: List[str],
handler_classes: List[Type[EventHandler]],
) -> Set[Type[EventHandler]]:
"""
Get applicable EventHandler subclasses for a list of event names.
:param event_names: the list of event names
:return: a list of EventHandler subclasses for which requirements are met
"""
return {
handler for handler in handler_classes
if set(handler.required_events()).issubset(event_names)
}
@staticmethod
def _get_event_handler_instances(
handler_classes: Set[Type[EventHandler]],
**kwargs,
) -> List[EventHandler]:
"""
Create instances from a list of EventHandlers (sub)classes.
:param handler_classes: the list of EventHandler subclasses
:param kwargs: the kwargs to provide when instanciating EventHandler subclasses
:return: the list of concrete instances
"""
# Doing this manually to catch exceptions, e.g. when a given EventHandler subclass is
# abstract and thus should not be instantiated
handlers = []
for handler_class in handler_classes:
try:
instance = handler_class(**kwargs)
handlers.append(instance)
except RuntimeError:
pass
return handlers
@staticmethod
def _get_subclasses(
cls: Type,
) -> Set[Type]:
"""Get all subclasses of a class recursively."""
return set(cls.__subclasses__()) | {
subsubcls
for subcls in cls.__subclasses__()
for subsubcls in AutoProcessor._get_subclasses(subcls)
}
@staticmethod
def _import_event_handler_submodules(
name: str = __name__,
recursive=True,
):
"""Force import of EventHandler submodules."""
import importlib
import pkgutil
package = importlib.import_module(name)
results = {}
for loader, name, is_pkg in pkgutil.walk_packages(package.__path__):
full_name = package.__name__ + '.' + name
results[full_name] = importlib.import_module(full_name)
if recursive and is_pkg:
results.update(AutoProcessor._import_event_handler_submodules(full_name))
return results
class ProcessingProgressDisplay():
"""Display processing progress periodically on stdout."""

View file

@ -15,7 +15,7 @@
"""Module for CPU time events processing."""
from typing import Dict
from typing import List
from typing import Set
from tracetools_read import get_field
@ -53,10 +53,10 @@ class CpuTimeHandler(EventHandler):
self._cpu_start: Dict[int, int] = {}
@staticmethod
def required_events() -> List[str]:
return [
def required_events() -> Set[str]:
return {
'sched_switch',
]
}
@property
def data(self) -> CpuTimeDataModel:

View file

@ -15,7 +15,7 @@
"""Module for memory usage events processing."""
from typing import Dict
from typing import List
from typing import Set
from tracetools_read import get_field
@ -31,6 +31,8 @@ class MemoryUsageHandler(EventHandler):
self,
**kwargs,
) -> None:
if type(self) is MemoryUsageHandler:
raise RuntimeError('Do not instanciate MemoryUsageHandler directly!')
super().__init__(**kwargs)
self._data_model = MemoryUsageDataModel()
@ -98,11 +100,11 @@ class UserspaceMemoryUsageHandler(MemoryUsageHandler):
self._memory: Dict[int, int] = {}
@staticmethod
def required_events() -> List[str]:
return [
def required_events() -> Set[str]:
return {
'lttng_ust_libc:malloc',
'lttng_ust_libc:free',
]
}
def _handle_malloc(
self, event: Dict, metadata: EventMetadata
@ -209,11 +211,11 @@ class KernelMemoryUsageHandler(MemoryUsageHandler):
)
@staticmethod
def required_events() -> List[str]:
return [
def required_events() -> Set[str]:
return {
'kmem_mm_page_alloc',
'kmem_mm_page_free',
]
}
def _handle_malloc(
self, event: Dict, metadata: EventMetadata

View file

@ -17,6 +17,7 @@
from collections import defaultdict
from typing import Dict
from typing import List
from typing import Set
from typing import Union
from tracetools_read import get_field
@ -87,12 +88,12 @@ class ProfileHandler(EventHandler):
self._current_funcs: Dict[int, List[List[Union[str, int]]]] = defaultdict(list)
@staticmethod
def required_events() -> List[str]:
return [
def required_events() -> Set[str]:
return {
'lttng_ust_cyg_profile_fast:func_entry',
'lttng_ust_cyg_profile_fast:func_exit',
'sched_switch',
]
}
@staticmethod
def addr_to_int(addr: Union[int, str]) -> int:

View file

@ -15,6 +15,7 @@
"""Module for trace events processor and ROS model creation."""
from typing import Dict
from typing import Set
from tracetools_read import get_field
@ -76,6 +77,12 @@ class Ros2Handler(EventHandler):
# Temporary buffers
self._callback_instances = {}
@staticmethod
def required_events() -> Set[str]:
return {
'ros2:rcl_init',
}
@property
def data(self) -> Ros2DataModel:
return self._data_model

View file

@ -0,0 +1,26 @@
# Copyright 2019 Apex.AI, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tracetools_analysis.loading import load_file
from tracetools_analysis.processor import AutoProcessor
from . import get_input_path
def main():
input_path = get_input_path()
events = load_file(input_path)
processor = AutoProcessor(events)
processor.print_data()