tracetools_analysis/analysis/ros_processor.py

73 lines
2.8 KiB
Python
Raw Normal View History

# Process trace events and create ROS model
from .lttng_models import EventMetadata, get_field, get_name
def ros_process(events):
"""
Process unpickled events and create ROS model
:param events (list(dict(str:str:))): the list of events
:return the processor object
"""
processor = RosProcessor()
processor.process_events(events)
return processor
class RosProcessor():
2019-06-06 09:22:21 +02:00
"""
ROS-aware event processing/handling class.
Handles a trace's events and builds a model with the data.
"""
def __init__(self):
# TODO add other stuff
# Instances of callback_start for eventual matching
self._callback_starts = {}
# Callback instances, callback_address: end - start
self.callbacks_instances = {}
# Link a ROS trace event to its corresponding handling method
self._handler_map = {
'ros2:rcl_subscription_init': self._handle_subscription_init,
'ros2:rclcpp_subscription_callback_added': self._handle_subscription_callback_added,
'ros2:rclcpp_subscription_callback_start': self._handle_subscription_callback_start,
'ros2:rclcpp_subscription_callback_end': self._handle_subscription_callback_end,
}
def process_events(self, events):
"""
Process events
:param events (list(dict(str:str))): the events to process
"""
for event in events:
self._handle(event)
def _handle(self, event):
handler_function = self._handler_map.get(get_name(event), None)
if handler_function is not None:
name = get_name(event)
pid = get_field(event, 'vpid', default=get_field(event, 'pid'))
tid = get_field(event, 'vtid', default=get_field(event, 'tid'))
timestamp = get_field(event, '_timestamp')
procname = get_field(event, 'procname')
metadata = EventMetadata(name, pid, tid, timestamp, procname)
handler_function(event, metadata)
def _handle_subscription_init(self, event, metadata):
# TODO
pass
def _handle_subscription_callback_added(self, event, metadata):
# Add the callback address key and create an empty list
callback_addr = get_field(event, 'callback')
self.callbacks_instances[callback_addr] = []
def _handle_subscription_callback_start(self, event, metadata):
callback_addr = get_field(event, 'callback')
self._callback_starts[callback_addr] = metadata.timestamp
def _handle_subscription_callback_end(self, event, metadata):
callback_addr = get_field(event, 'callback')
start_timestamp = self._callback_starts.pop(callback_addr, None)
if start_timestamp is not None:
duration = metadata.timestamp - start_timestamp
self.callbacks_instances[callback_addr].append(duration)