Merge branch 'required-events' into 'master'
Allow EventHandlers to provide list of required events See merge request micro-ROS/ros_tracing/tracetools_analysis!38
This commit is contained in:
commit
6b28456ac8
5 changed files with 132 additions and 4 deletions
|
@ -13,11 +13,13 @@
|
|||
# limitations under the License.
|
||||
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
import unittest
|
||||
|
||||
from tracetools_analysis.processor import EventHandler
|
||||
from tracetools_analysis.processor import EventMetadata
|
||||
from tracetools_analysis.processor import Processor
|
||||
from tracetools_analysis.processor import RequiredEventNotFoundError
|
||||
|
||||
|
||||
class StubHandler1(EventHandler):
|
||||
|
@ -64,6 +66,47 @@ class WrongHandler(EventHandler):
|
|||
pass
|
||||
|
||||
|
||||
class MissingEventHandler(EventHandler):
|
||||
|
||||
def __init__(self) -> None:
|
||||
handler_map = {
|
||||
'myeventname': self._handler_whatever,
|
||||
}
|
||||
super().__init__(handler_map=handler_map)
|
||||
|
||||
@staticmethod
|
||||
def required_events() -> List[str]:
|
||||
return [
|
||||
'no_handler_for_this',
|
||||
'myeventname',
|
||||
]
|
||||
|
||||
def _handler_whatever(
|
||||
self, event: Dict, metadata: EventMetadata
|
||||
) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class EventHandlerWithRequiredEvent(EventHandler):
|
||||
|
||||
def __init__(self) -> None:
|
||||
handler_map = {
|
||||
'myrequiredevent': self._handler_whatever,
|
||||
}
|
||||
super().__init__(handler_map=handler_map)
|
||||
|
||||
@staticmethod
|
||||
def required_events() -> List[str]:
|
||||
return [
|
||||
'myrequiredevent',
|
||||
]
|
||||
|
||||
def _handler_whatever(
|
||||
self, event: Dict, metadata: EventMetadata
|
||||
) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class TestProcessor(unittest.TestCase):
|
||||
|
||||
def __init__(self, *args) -> None:
|
||||
|
@ -95,6 +138,28 @@ class TestProcessor(unittest.TestCase):
|
|||
self.assertTrue(handler1.handler_called, 'event handler not called')
|
||||
self.assertTrue(handler2.handler_called, 'event handler not called')
|
||||
|
||||
def test_assert_handler_functions_for_required_events(self) -> None:
|
||||
with self.assertRaises(AssertionError):
|
||||
MissingEventHandler()
|
||||
|
||||
def test_check_required_events(self) -> None:
|
||||
mock_event = {
|
||||
'_name': 'myeventname',
|
||||
'_timestamp': 0,
|
||||
'cpu_id': 0,
|
||||
}
|
||||
# Fails check
|
||||
with self.assertRaises(RequiredEventNotFoundError):
|
||||
Processor(EventHandlerWithRequiredEvent()).process([mock_event])
|
||||
|
||||
required_mock_event = {
|
||||
'_name': 'myrequiredevent',
|
||||
'_timestamp': 69,
|
||||
'cpu_id': 0,
|
||||
}
|
||||
# Passes check
|
||||
Processor(EventHandlerWithRequiredEvent()).process([required_mock_event, mock_event])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -123,6 +123,7 @@ class EventHandler(Dependant):
|
|||
"""
|
||||
assert handler_map is not None and len(handler_map) > 0, \
|
||||
f'empty map: {self.__class__.__name__}'
|
||||
assert all(required_name in handler_map.keys() for required_name in self.required_events())
|
||||
self._handler_map = handler_map
|
||||
self.processor = None
|
||||
|
||||
|
@ -136,6 +137,16 @@ class EventHandler(Dependant):
|
|||
"""Get the data model."""
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def required_events() -> List[str]:
|
||||
"""
|
||||
Get the list of events required by this EventHandler.
|
||||
|
||||
Without these events, the EventHandler would be invalid/useless. Inheriting classes can
|
||||
decide not to declare that they require specific events.
|
||||
"""
|
||||
return []
|
||||
|
||||
def register_processor(self, processor: 'Processor') -> None:
|
||||
"""Register processor with this `EventHandler` so that it can query other handlers."""
|
||||
self.processor = processor
|
||||
|
@ -242,6 +253,10 @@ class DependencySolver():
|
|||
visited.add(dep_type)
|
||||
|
||||
|
||||
class RequiredEventNotFoundError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
class Processor():
|
||||
"""Processor class, which dispatches events to event handlers."""
|
||||
|
||||
|
@ -257,11 +272,11 @@ class Processor():
|
|||
:param kwargs: the parameters to pass on to new handlers
|
||||
"""
|
||||
self._initial_handlers = list(handlers)
|
||||
expanded_handlers = self._expand_dependencies(*handlers, **kwargs)
|
||||
self._handler_multimap = self._get_handler_maps(expanded_handlers)
|
||||
self._register_with_handlers(expanded_handlers)
|
||||
self._expanded_handlers = self._expand_dependencies(*handlers, **kwargs)
|
||||
self._handler_multimap = self._get_handler_maps(self._expanded_handlers)
|
||||
self._register_with_handlers(self._expanded_handlers)
|
||||
self._progress_display = ProcessingProgressDisplay(
|
||||
[type(handler).__name__ for handler in expanded_handlers],
|
||||
[type(handler).__name__ for handler in self._expanded_handlers],
|
||||
)
|
||||
self._processing_done = False
|
||||
|
||||
|
@ -306,17 +321,35 @@ class Processor():
|
|||
for handler in handlers:
|
||||
handler.register_processor(self)
|
||||
|
||||
def _check_required_events(
|
||||
self,
|
||||
events: List[DictEvent],
|
||||
) -> None:
|
||||
event_names = {get_event_name(event) for event in events}
|
||||
# Check names separately so that we can know which event from which handler is missing
|
||||
for handler in self._expanded_handlers:
|
||||
for name in handler.required_events():
|
||||
if name not in event_names:
|
||||
raise RequiredEventNotFoundError(
|
||||
f'missing event {name} for {handler.__class__.__name__}'
|
||||
)
|
||||
|
||||
def process(
|
||||
self,
|
||||
events: List[DictEvent],
|
||||
erase_progress: bool = False,
|
||||
no_required_events_check: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Process all events.
|
||||
|
||||
:param events: the events to process
|
||||
:param erase_progress: whether to erase the progress message
|
||||
:param no_required_events_check: whether to skip the check for required events
|
||||
"""
|
||||
if not no_required_events_check:
|
||||
self._check_required_events(events)
|
||||
|
||||
if not self._processing_done:
|
||||
self._progress_display.set_work_total(len(events))
|
||||
for event in events:
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
"""Module for CPU time events processing."""
|
||||
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
|
||||
from tracetools_read import get_field
|
||||
|
||||
|
@ -51,6 +52,12 @@ class CpuTimeHandler(EventHandler):
|
|||
# cpu_id -> start timestamp of the running thread
|
||||
self._cpu_start: Dict[int, int] = {}
|
||||
|
||||
@staticmethod
|
||||
def required_events() -> List[str]:
|
||||
return [
|
||||
'sched_switch',
|
||||
]
|
||||
|
||||
@property
|
||||
def data(self) -> CpuTimeDataModel:
|
||||
return self._data_model
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
"""Module for memory usage events processing."""
|
||||
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
|
||||
from tracetools_read import get_field
|
||||
|
||||
|
@ -96,6 +97,13 @@ class UserspaceMemoryUsageHandler(MemoryUsageHandler):
|
|||
# (used to know keep track of the memory size allocated at a given pointer)
|
||||
self._memory: Dict[int, int] = {}
|
||||
|
||||
@staticmethod
|
||||
def required_events() -> List[str]:
|
||||
return [
|
||||
'lttng_ust_libc:malloc',
|
||||
'lttng_ust_libc:free',
|
||||
]
|
||||
|
||||
def _handle_malloc(
|
||||
self, event: Dict, metadata: EventMetadata
|
||||
) -> None:
|
||||
|
@ -200,6 +208,13 @@ class KernelMemoryUsageHandler(MemoryUsageHandler):
|
|||
**kwargs,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def required_events() -> List[str]:
|
||||
return [
|
||||
'kmem_mm_page_alloc',
|
||||
'kmem_mm_page_free',
|
||||
]
|
||||
|
||||
def _handle_malloc(
|
||||
self, event: Dict, metadata: EventMetadata
|
||||
) -> None:
|
||||
|
|
|
@ -86,6 +86,14 @@ class ProfileHandler(EventHandler):
|
|||
# ]
|
||||
self._current_funcs: Dict[int, List[List[Union[str, int]]]] = defaultdict(list)
|
||||
|
||||
@staticmethod
|
||||
def required_events() -> List[str]:
|
||||
return [
|
||||
'lttng_ust_cyg_profile_fast:func_entry',
|
||||
'lttng_ust_cyg_profile_fast:func_exit',
|
||||
'sched_switch',
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def addr_to_int(addr: Union[int, str]) -> int:
|
||||
"""Transform an address into an `int` if it's a hex `str`."""
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue