Add typing

This commit is contained in:
Christophe Bedard 2019-06-14 14:14:13 +02:00
parent 3fc21e7e94
commit c4bd35def9
6 changed files with 85 additions and 68 deletions

View file

@ -12,7 +12,7 @@ class DataModel():
It uses pandas DataFrames directly.
"""
def __init__(self):
def __init__(self) -> None:
# Objects (one-time events, usually when something is created)
self.contexts = pd.DataFrame(columns=['context_handle',
'timestamp',
@ -71,34 +71,34 @@ class DataModel():
'duration',
'intra_process'])
def add_context(self, context_handle, timestamp, pid):
def add_context(self, context_handle, timestamp, pid) -> None:
self.contexts.loc[context_handle] = [timestamp, pid]
def add_node(self, node_handle, timestamp, tid, rmw_handle, name, namespace):
def add_node(self, node_handle, timestamp, tid, rmw_handle, name, namespace) -> None:
self.nodes.loc[node_handle] = [timestamp, tid, rmw_handle, name, namespace]
def add_publisher(self, handle, timestamp, node_handle, rmw_handle, topic_name, depth):
def add_publisher(self, handle, timestamp, node_handle, rmw_handle, topic_name, depth) -> None:
self.publishers.loc[handle] = [timestamp, node_handle, rmw_handle, topic_name, depth]
def add_subscription(self, handle, timestamp, node_handle, rmw_handle, topic_name, depth):
def add_subscription(self, handle, timestamp, node_handle, rmw_handle, topic_name, depth) -> None:
self.subscriptions.loc[handle] = [timestamp, node_handle, rmw_handle, topic_name, depth]
def add_service(self, handle, timestamp, node_handle, rmw_handle, service_name):
def add_service(self, handle, timestamp, node_handle, rmw_handle, service_name) -> None:
self.services.loc[handle] = [timestamp, node_handle, rmw_handle, service_name]
def add_client(self, handle, timestamp, node_handle, rmw_handle, service_name):
def add_client(self, handle, timestamp, node_handle, rmw_handle, service_name) -> None:
self.clients.loc[handle] = [timestamp, node_handle, rmw_handle, service_name]
def add_timer(self, handle, timestamp, period):
def add_timer(self, handle, timestamp, period) -> None:
self.timers.loc[handle] = [timestamp, period]
def add_callback_object(self, handle, timestamp, callback_object):
def add_callback_object(self, handle, timestamp, callback_object) -> None:
self.callback_objects.loc[handle] = [timestamp, callback_object]
def add_callback_symbol(self, callback_object, timestamp, symbol):
def add_callback_symbol(self, callback_object, timestamp, symbol) -> None:
self.callback_symbols.loc[callback_object] = [timestamp, symbol]
def add_callback_instance(self, callback_object, timestamp, duration, intra_process):
def add_callback_instance(self, callback_object, timestamp, duration, intra_process) -> None:
data = {
'callback_object': callback_object,
'timestamp': timestamp,
@ -107,7 +107,7 @@ class DataModel():
}
self.callback_instances = self.callback_instances.append(data, ignore_index=True)
def print_model(self):
def print_model(self) -> None:
"""Debug method to print every contained df."""
print('====================DATA MODEL====================')
print(f'Contexts:\n{self.contexts.to_string()}')

View file

@ -1,47 +1,52 @@
# Event handler
import sys
from typing import Callable
from typing import Dict
from typing import List
from . import lttng_models
from .lttng_models import EventMetadata
from .lttng_models import get_field
from .lttng_models import get_name
class EventHandler():
"""Base event handling class."""
def __init__(self, handler_map):
def __init__(self, handler_map: Dict[str, Callable[[Dict, EventMetadata], None]]) -> None:
"""
Constructor.
:param handler_map (map(str: function)): the mapping from event name to handling method
:param handler_map: the mapping from event name to handling method
"""
self._handler_map = handler_map
def handle_events(self, events):
def handle_events(self, events: List[Dict[str, str]]) -> None:
"""
Handle events by calling their handlers.
:param events (list(dict(str:str))): the events to process
:param events: the events to process
"""
for event in events:
self._handle(event)
def _handle(self, event):
event_name = lttng_models.get_name(event)
def _handle(self, event: Dict[str, str]) -> None:
event_name = get_name(event)
handler_function = self._handler_map.get(event_name, None)
if handler_function is not None:
pid = lttng_models.get_field(event,
'vpid',
default=lttng_models.get_field(event,
'pid',
raise_if_not_found=False))
tid = lttng_models.get_field(event,
'vtid',
default=lttng_models.get_field(event,
'tid',
raise_if_not_found=False))
timestamp = lttng_models.get_field(event, '_timestamp')
procname = lttng_models.get_field(event, 'procname')
metadata = lttng_models.EventMetadata(event_name, pid, tid, timestamp, procname)
pid = get_field(event,
'vpid',
default=get_field(event,
'pid',
raise_if_not_found=False))
tid = get_field(event,
'vtid',
default=get_field(event,
'tid',
raise_if_not_found=False))
timestamp = get_field(event, '_timestamp')
procname = get_field(event, 'procname')
metadata = EventMetadata(event_name, pid, tid, timestamp, procname)
handler_function(event, metadata)
else:
print(f'unhandled event name: {event_name}', file=sys.stderr)

View file

@ -1,12 +1,14 @@
import pickle
from typing import Dict
from typing import List
def load_pickle(pickle_file_path):
def load_pickle(pickle_file_path: str) -> List[Dict]:
"""
Load pickle file containing converted trace events.
:param pickle_file_path (str): the path to the pickle file to load
:return list(dict): the list of events (dicts) read from the file
:param pickle_file_path: the path to the pickle file to load
:return: the list of events read from the file
"""
events = []
with open(pickle_file_path, 'rb') as f:

View file

@ -1,7 +1,10 @@
# Model objects for LTTng traces/events
from typing import Any
from typing import Dict
def get_field(event, field_name, default=None, raise_if_not_found=True):
def get_field(event: Dict, field_name: str, default=None, raise_if_not_found=True) -> Any:
field_value = event.get(field_name, default)
# If enabled, raise exception as soon as possible to avoid headaches
if raise_if_not_found and field_value is None:
@ -9,14 +12,14 @@ def get_field(event, field_name, default=None, raise_if_not_found=True):
return field_value
def get_name(event):
def get_name(event: Dict) -> str:
return get_field(event, '_name')
class EventMetadata():
"""Container for event metadata."""
def __init__(self, event_name, pid, tid, timestamp, procname):
def __init__(self, event_name, pid, tid, timestamp, procname) -> None:
self._event_name = event_name
self._pid = pid
self._tid = tid

View file

@ -1,20 +1,12 @@
# Process trace events and create ROS model
from typing import Dict
from typing import List
from .data_model import DataModel
from .handler import EventHandler
from .lttng_models import get_field
def ros2_process(events):
"""
Process unpickled events and create ROS 2 model.
:param events (list(dict(str:str:))): the list of events
:return the processor object
"""
processor = Ros2Processor()
processor.handle_events(events)
return processor
from .lttng_models import EventMetadata
class Ros2Processor(EventHandler):
@ -24,7 +16,7 @@ class Ros2Processor(EventHandler):
Handles a trace's events and builds a model with the data.
"""
def __init__(self):
def __init__(self) -> None:
# Link a ROS trace event to its corresponding handling method
handler_map = {
'ros2:rcl_init':
@ -61,16 +53,16 @@ class Ros2Processor(EventHandler):
# Temporary buffers
self._callback_instances = {}
def get_data_model(self):
def get_data_model(self) -> DataModel:
return self._data
def _handle_rcl_init(self, event, metadata):
def _handle_rcl_init(self, event: Dict, metadata: EventMetadata) -> None:
context_handle = get_field(event, 'context_handle')
timestamp = metadata.timestamp
pid = metadata.pid
self._data.add_context(context_handle, timestamp, pid)
def _handle_rcl_node_init(self, event, metadata):
def _handle_rcl_node_init(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'node_handle')
timestamp = metadata.timestamp
tid = metadata.tid
@ -79,7 +71,7 @@ class Ros2Processor(EventHandler):
namespace = get_field(event, 'namespace')
self._data.add_node(handle, timestamp, tid, rmw_handle, name, namespace)
def _handle_rcl_publisher_init(self, event, metadata):
def _handle_rcl_publisher_init(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'publisher_handle')
timestamp = metadata.timestamp
node_handle = get_field(event, 'node_handle')
@ -88,7 +80,7 @@ class Ros2Processor(EventHandler):
depth = get_field(event, 'depth')
self._data.add_publisher(handle, timestamp, node_handle, rmw_handle, topic_name, depth)
def _handle_subscription_init(self, event, metadata):
def _handle_subscription_init(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'subscription_handle')
timestamp = metadata.timestamp
node_handle = get_field(event, 'node_handle')
@ -97,13 +89,13 @@ class Ros2Processor(EventHandler):
depth = get_field(event, 'depth')
self._data.add_subscription(handle, timestamp, node_handle, rmw_handle, topic_name, depth)
def _handle_rclcpp_subscription_callback_added(self, event, metadata):
def _handle_rclcpp_subscription_callback_added(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'subscription_handle')
timestamp = metadata.timestamp
callback_object = get_field(event, 'callback')
self._data.add_callback_object(handle, timestamp, callback_object)
def _handle_rcl_service_init(self, event, metadata):
def _handle_rcl_service_init(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'service_handle')
timestamp = metadata.timestamp
node_handle = get_field(event, 'node_handle')
@ -111,13 +103,13 @@ class Ros2Processor(EventHandler):
service_name = get_field(event, 'service_name')
self._data.add_service(handle, timestamp, node_handle, rmw_handle, service_name)
def _handle_rclcpp_service_callback_added(self, event, metadata):
def _handle_rclcpp_service_callback_added(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'service_handle')
timestamp = metadata.timestamp
callback_object = get_field(event, 'callback')
self._data.add_callback_object(handle, timestamp, callback_object)
def _handle_rcl_client_init(self, event, metadata):
def _handle_rcl_client_init(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'client_handle')
timestamp = metadata.timestamp
node_handle = get_field(event, 'node_handle')
@ -125,30 +117,30 @@ class Ros2Processor(EventHandler):
service_name = get_field(event, 'service_name')
self._data.add_client(handle, timestamp, node_handle, rmw_handle, service_name)
def _handle_rcl_timer_init(self, event, metadata):
def _handle_rcl_timer_init(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'timer_handle')
timestamp = metadata.timestamp
period = get_field(event, 'period')
self._data.add_timer(handle, timestamp, period)
def _handle_rclcpp_timer_callback_added(self, event, metadata):
def _handle_rclcpp_timer_callback_added(self, event: Dict, metadata: EventMetadata) -> None:
handle = get_field(event, 'timer_handle')
timestamp = metadata.timestamp
callback_object = get_field(event, 'callback')
self._data.add_callback_object(handle, timestamp, callback_object)
def _handle_rclcpp_callback_register(self, event, metadata):
def _handle_rclcpp_callback_register(self, event: Dict, metadata: EventMetadata) -> None:
callback_object = get_field(event, 'callback')
timestamp = metadata.timestamp
symbol = get_field(event, 'symbol')
self._data.add_callback_symbol(callback_object, timestamp, symbol)
def _handle_callback_start(self, event, metadata):
def _handle_callback_start(self, event: Dict, metadata: EventMetadata) -> None:
# Add to dict
callback_addr = get_field(event, 'callback')
self._callback_instances[callback_addr] = (event, metadata)
def _handle_callback_end(self, event, metadata):
def _handle_callback_end(self, event: Dict, metadata: EventMetadata) -> None:
# Fetch from dict
callback_object = get_field(event, 'callback')
(event_start, metadata_start) = self._callback_instances.get(callback_object)
@ -162,3 +154,15 @@ class Ros2Processor(EventHandler):
bool(is_intra_process))
else:
print(f'No matching callback start for callback object "{callback_object}"')
def ros2_process(events: List[Dict[str, str]]) -> Ros2Processor:
"""
Process unpickled events and create ROS 2 model.
:param events: the list of events
:return: the processor object
"""
processor = Ros2Processor()
processor.handle_events(events)
return processor

View file

@ -1,5 +1,7 @@
# CTF to pickle conversion
from pickle import Pickler
import babeltrace
# List of ignored CTF fields
@ -10,12 +12,13 @@ _IGNORED_FIELDS = [
_DISCARD = 'events_discarded'
def ctf_to_pickle(trace_directory, target):
def ctf_to_pickle(trace_directory: str, target: Pickler) -> int:
"""
Load CTF trace and convert to a pickle file.
:param trace_directory (str): the main/top trace directory
:param target (Pickler): the target pickle file to write to
:param trace_directory: the main/top trace directory
:param target: the target pickle file to write to
:return: the number of events written
"""
# add traces
tc = babeltrace.TraceCollection()