diff --git a/tracetools_analysis/analysis/data_model.py b/tracetools_analysis/analysis/data_model.py index 7707af0..7eb7ad6 100644 --- a/tracetools_analysis/analysis/data_model.py +++ b/tracetools_analysis/analysis/data_model.py @@ -12,7 +12,7 @@ class DataModel(): It uses pandas DataFrames directly. """ - def __init__(self): + def __init__(self) -> None: # Objects (one-time events, usually when something is created) self.contexts = pd.DataFrame(columns=['context_handle', 'timestamp', @@ -71,34 +71,34 @@ class DataModel(): 'duration', 'intra_process']) - def add_context(self, context_handle, timestamp, pid): + def add_context(self, context_handle, timestamp, pid) -> None: self.contexts.loc[context_handle] = [timestamp, pid] - def add_node(self, node_handle, timestamp, tid, rmw_handle, name, namespace): + def add_node(self, node_handle, timestamp, tid, rmw_handle, name, namespace) -> None: self.nodes.loc[node_handle] = [timestamp, tid, rmw_handle, name, namespace] - def add_publisher(self, handle, timestamp, node_handle, rmw_handle, topic_name, depth): + def add_publisher(self, handle, timestamp, node_handle, rmw_handle, topic_name, depth) -> None: self.publishers.loc[handle] = [timestamp, node_handle, rmw_handle, topic_name, depth] - def add_subscription(self, handle, timestamp, node_handle, rmw_handle, topic_name, depth): + def add_subscription(self, handle, timestamp, node_handle, rmw_handle, topic_name, depth) -> None: self.subscriptions.loc[handle] = [timestamp, node_handle, rmw_handle, topic_name, depth] - def add_service(self, handle, timestamp, node_handle, rmw_handle, service_name): + def add_service(self, handle, timestamp, node_handle, rmw_handle, service_name) -> None: self.services.loc[handle] = [timestamp, node_handle, rmw_handle, service_name] - def add_client(self, handle, timestamp, node_handle, rmw_handle, service_name): + def add_client(self, handle, timestamp, node_handle, rmw_handle, service_name) -> None: self.clients.loc[handle] = [timestamp, node_handle, rmw_handle, service_name] - def add_timer(self, handle, timestamp, period): + def add_timer(self, handle, timestamp, period) -> None: self.timers.loc[handle] = [timestamp, period] - def add_callback_object(self, handle, timestamp, callback_object): + def add_callback_object(self, handle, timestamp, callback_object) -> None: self.callback_objects.loc[handle] = [timestamp, callback_object] - def add_callback_symbol(self, callback_object, timestamp, symbol): + def add_callback_symbol(self, callback_object, timestamp, symbol) -> None: self.callback_symbols.loc[callback_object] = [timestamp, symbol] - def add_callback_instance(self, callback_object, timestamp, duration, intra_process): + def add_callback_instance(self, callback_object, timestamp, duration, intra_process) -> None: data = { 'callback_object': callback_object, 'timestamp': timestamp, @@ -107,7 +107,7 @@ class DataModel(): } self.callback_instances = self.callback_instances.append(data, ignore_index=True) - def print_model(self): + def print_model(self) -> None: """Debug method to print every contained df.""" print('====================DATA MODEL====================') print(f'Contexts:\n{self.contexts.to_string()}') diff --git a/tracetools_analysis/analysis/handler.py b/tracetools_analysis/analysis/handler.py index 84b1859..98e4b34 100644 --- a/tracetools_analysis/analysis/handler.py +++ b/tracetools_analysis/analysis/handler.py @@ -1,47 +1,52 @@ # Event handler import sys +from typing import Callable +from typing import Dict +from typing import List -from . import lttng_models +from .lttng_models import EventMetadata +from .lttng_models import get_field +from .lttng_models import get_name class EventHandler(): """Base event handling class.""" - def __init__(self, handler_map): + def __init__(self, handler_map: Dict[str, Callable[[Dict, EventMetadata], None]]) -> None: """ Constructor. - :param handler_map (map(str: function)): the mapping from event name to handling method + :param handler_map: the mapping from event name to handling method """ self._handler_map = handler_map - def handle_events(self, events): + def handle_events(self, events: List[Dict[str, str]]) -> None: """ Handle events by calling their handlers. - :param events (list(dict(str:str))): the events to process + :param events: the events to process """ for event in events: self._handle(event) - def _handle(self, event): - event_name = lttng_models.get_name(event) + def _handle(self, event: Dict[str, str]) -> None: + event_name = get_name(event) handler_function = self._handler_map.get(event_name, None) if handler_function is not None: - pid = lttng_models.get_field(event, - 'vpid', - default=lttng_models.get_field(event, - 'pid', - raise_if_not_found=False)) - tid = lttng_models.get_field(event, - 'vtid', - default=lttng_models.get_field(event, - 'tid', - raise_if_not_found=False)) - timestamp = lttng_models.get_field(event, '_timestamp') - procname = lttng_models.get_field(event, 'procname') - metadata = lttng_models.EventMetadata(event_name, pid, tid, timestamp, procname) + pid = get_field(event, + 'vpid', + default=get_field(event, + 'pid', + raise_if_not_found=False)) + tid = get_field(event, + 'vtid', + default=get_field(event, + 'tid', + raise_if_not_found=False)) + timestamp = get_field(event, '_timestamp') + procname = get_field(event, 'procname') + metadata = EventMetadata(event_name, pid, tid, timestamp, procname) handler_function(event, metadata) else: print(f'unhandled event name: {event_name}', file=sys.stderr) diff --git a/tracetools_analysis/analysis/load.py b/tracetools_analysis/analysis/load.py index 053d058..073b086 100644 --- a/tracetools_analysis/analysis/load.py +++ b/tracetools_analysis/analysis/load.py @@ -1,12 +1,14 @@ import pickle +from typing import Dict +from typing import List -def load_pickle(pickle_file_path): +def load_pickle(pickle_file_path: str) -> List[Dict]: """ Load pickle file containing converted trace events. - :param pickle_file_path (str): the path to the pickle file to load - :return list(dict): the list of events (dicts) read from the file + :param pickle_file_path: the path to the pickle file to load + :return: the list of events read from the file """ events = [] with open(pickle_file_path, 'rb') as f: diff --git a/tracetools_analysis/analysis/lttng_models.py b/tracetools_analysis/analysis/lttng_models.py index 7562d2f..2c54719 100644 --- a/tracetools_analysis/analysis/lttng_models.py +++ b/tracetools_analysis/analysis/lttng_models.py @@ -1,7 +1,10 @@ # Model objects for LTTng traces/events +from typing import Any +from typing import Dict -def get_field(event, field_name, default=None, raise_if_not_found=True): + +def get_field(event: Dict, field_name: str, default=None, raise_if_not_found=True) -> Any: field_value = event.get(field_name, default) # If enabled, raise exception as soon as possible to avoid headaches if raise_if_not_found and field_value is None: @@ -9,14 +12,14 @@ def get_field(event, field_name, default=None, raise_if_not_found=True): return field_value -def get_name(event): +def get_name(event: Dict) -> str: return get_field(event, '_name') class EventMetadata(): """Container for event metadata.""" - def __init__(self, event_name, pid, tid, timestamp, procname): + def __init__(self, event_name, pid, tid, timestamp, procname) -> None: self._event_name = event_name self._pid = pid self._tid = tid diff --git a/tracetools_analysis/analysis/ros2_processor.py b/tracetools_analysis/analysis/ros2_processor.py index 1664f9f..087b2b6 100644 --- a/tracetools_analysis/analysis/ros2_processor.py +++ b/tracetools_analysis/analysis/ros2_processor.py @@ -1,20 +1,12 @@ # Process trace events and create ROS model +from typing import Dict +from typing import List + from .data_model import DataModel from .handler import EventHandler from .lttng_models import get_field - - -def ros2_process(events): - """ - Process unpickled events and create ROS 2 model. - - :param events (list(dict(str:str:))): the list of events - :return the processor object - """ - processor = Ros2Processor() - processor.handle_events(events) - return processor +from .lttng_models import EventMetadata class Ros2Processor(EventHandler): @@ -24,7 +16,7 @@ class Ros2Processor(EventHandler): Handles a trace's events and builds a model with the data. """ - def __init__(self): + def __init__(self) -> None: # Link a ROS trace event to its corresponding handling method handler_map = { 'ros2:rcl_init': @@ -61,16 +53,16 @@ class Ros2Processor(EventHandler): # Temporary buffers self._callback_instances = {} - def get_data_model(self): + def get_data_model(self) -> DataModel: return self._data - def _handle_rcl_init(self, event, metadata): + def _handle_rcl_init(self, event: Dict, metadata: EventMetadata) -> None: context_handle = get_field(event, 'context_handle') timestamp = metadata.timestamp pid = metadata.pid self._data.add_context(context_handle, timestamp, pid) - def _handle_rcl_node_init(self, event, metadata): + def _handle_rcl_node_init(self, event: Dict, metadata: EventMetadata) -> None: handle = get_field(event, 'node_handle') timestamp = metadata.timestamp tid = metadata.tid @@ -79,7 +71,7 @@ class Ros2Processor(EventHandler): namespace = get_field(event, 'namespace') self._data.add_node(handle, timestamp, tid, rmw_handle, name, namespace) - def _handle_rcl_publisher_init(self, event, metadata): + def _handle_rcl_publisher_init(self, event: Dict, metadata: EventMetadata) -> None: handle = get_field(event, 'publisher_handle') timestamp = metadata.timestamp node_handle = get_field(event, 'node_handle') @@ -88,7 +80,7 @@ class Ros2Processor(EventHandler): depth = get_field(event, 'depth') self._data.add_publisher(handle, timestamp, node_handle, rmw_handle, topic_name, depth) - def _handle_subscription_init(self, event, metadata): + def _handle_subscription_init(self, event: Dict, metadata: EventMetadata) -> None: handle = get_field(event, 'subscription_handle') timestamp = metadata.timestamp node_handle = get_field(event, 'node_handle') @@ -97,13 +89,13 @@ class Ros2Processor(EventHandler): depth = get_field(event, 'depth') self._data.add_subscription(handle, timestamp, node_handle, rmw_handle, topic_name, depth) - def _handle_rclcpp_subscription_callback_added(self, event, metadata): + def _handle_rclcpp_subscription_callback_added(self, event: Dict, metadata: EventMetadata) -> None: handle = get_field(event, 'subscription_handle') timestamp = metadata.timestamp callback_object = get_field(event, 'callback') self._data.add_callback_object(handle, timestamp, callback_object) - def _handle_rcl_service_init(self, event, metadata): + def _handle_rcl_service_init(self, event: Dict, metadata: EventMetadata) -> None: handle = get_field(event, 'service_handle') timestamp = metadata.timestamp node_handle = get_field(event, 'node_handle') @@ -111,13 +103,13 @@ class Ros2Processor(EventHandler): service_name = get_field(event, 'service_name') self._data.add_service(handle, timestamp, node_handle, rmw_handle, service_name) - def _handle_rclcpp_service_callback_added(self, event, metadata): + def _handle_rclcpp_service_callback_added(self, event: Dict, metadata: EventMetadata) -> None: handle = get_field(event, 'service_handle') timestamp = metadata.timestamp callback_object = get_field(event, 'callback') self._data.add_callback_object(handle, timestamp, callback_object) - def _handle_rcl_client_init(self, event, metadata): + def _handle_rcl_client_init(self, event: Dict, metadata: EventMetadata) -> None: handle = get_field(event, 'client_handle') timestamp = metadata.timestamp node_handle = get_field(event, 'node_handle') @@ -125,30 +117,30 @@ class Ros2Processor(EventHandler): service_name = get_field(event, 'service_name') self._data.add_client(handle, timestamp, node_handle, rmw_handle, service_name) - def _handle_rcl_timer_init(self, event, metadata): + def _handle_rcl_timer_init(self, event: Dict, metadata: EventMetadata) -> None: handle = get_field(event, 'timer_handle') timestamp = metadata.timestamp period = get_field(event, 'period') self._data.add_timer(handle, timestamp, period) - def _handle_rclcpp_timer_callback_added(self, event, metadata): + def _handle_rclcpp_timer_callback_added(self, event: Dict, metadata: EventMetadata) -> None: handle = get_field(event, 'timer_handle') timestamp = metadata.timestamp callback_object = get_field(event, 'callback') self._data.add_callback_object(handle, timestamp, callback_object) - def _handle_rclcpp_callback_register(self, event, metadata): + def _handle_rclcpp_callback_register(self, event: Dict, metadata: EventMetadata) -> None: callback_object = get_field(event, 'callback') timestamp = metadata.timestamp symbol = get_field(event, 'symbol') self._data.add_callback_symbol(callback_object, timestamp, symbol) - def _handle_callback_start(self, event, metadata): + def _handle_callback_start(self, event: Dict, metadata: EventMetadata) -> None: # Add to dict callback_addr = get_field(event, 'callback') self._callback_instances[callback_addr] = (event, metadata) - def _handle_callback_end(self, event, metadata): + def _handle_callback_end(self, event: Dict, metadata: EventMetadata) -> None: # Fetch from dict callback_object = get_field(event, 'callback') (event_start, metadata_start) = self._callback_instances.get(callback_object) @@ -162,3 +154,15 @@ class Ros2Processor(EventHandler): bool(is_intra_process)) else: print(f'No matching callback start for callback object "{callback_object}"') + + +def ros2_process(events: List[Dict[str, str]]) -> Ros2Processor: + """ + Process unpickled events and create ROS 2 model. + + :param events: the list of events + :return: the processor object + """ + processor = Ros2Processor() + processor.handle_events(events) + return processor diff --git a/tracetools_analysis/conversion/ctf.py b/tracetools_analysis/conversion/ctf.py index c8cb14c..826231f 100644 --- a/tracetools_analysis/conversion/ctf.py +++ b/tracetools_analysis/conversion/ctf.py @@ -1,5 +1,7 @@ # CTF to pickle conversion +from pickle import Pickler + import babeltrace # List of ignored CTF fields @@ -10,12 +12,13 @@ _IGNORED_FIELDS = [ _DISCARD = 'events_discarded' -def ctf_to_pickle(trace_directory, target): +def ctf_to_pickle(trace_directory: str, target: Pickler) -> int: """ Load CTF trace and convert to a pickle file. - :param trace_directory (str): the main/top trace directory - :param target (Pickler): the target pickle file to write to + :param trace_directory: the main/top trace directory + :param target: the target pickle file to write to + :return: the number of events written """ # add traces tc = babeltrace.TraceCollection()