diff --git a/tracetools_analysis/test/tracetools_analysis/test_data_model_util.py b/tracetools_analysis/test/tracetools_analysis/test_data_model_util.py index 66f2cc3..36c9156 100644 --- a/tracetools_analysis/test/tracetools_analysis/test_data_model_util.py +++ b/tracetools_analysis/test/tracetools_analysis/test_data_model_util.py @@ -18,7 +18,7 @@ from typing import Dict import unittest from pandas import DataFrame -from pandas.util.testing import assert_frame_equal +from pandas.testing import assert_frame_equal from tracetools_analysis.data_model import DataModel from tracetools_analysis.processor import EventHandler diff --git a/tracetools_analysis/test/tracetools_analysis/test_profile_handler.py b/tracetools_analysis/test/tracetools_analysis/test_profile_handler.py index 1fba862..287abe9 100644 --- a/tracetools_analysis/test/tracetools_analysis/test_profile_handler.py +++ b/tracetools_analysis/test/tracetools_analysis/test_profile_handler.py @@ -18,7 +18,7 @@ from typing import List import unittest from pandas import DataFrame -from pandas.util.testing import assert_frame_equal +from pandas.testing import assert_frame_equal from tracetools_analysis.processor import Processor from tracetools_analysis.processor.profile import ProfileHandler @@ -286,17 +286,8 @@ class TestProfileHandler(unittest.TestCase): @staticmethod def build_expected_df(expected_data: List[Dict[str, Any]]) -> DataFrame: - # Make sure the columns are in the same order - expected_df = DataFrame(columns=[ - 'tid', - 'depth', - 'function_name', - 'parent_name', - 'start_timestamp', - 'duration', - 'actual_duration', - ]) - return expected_df.append(expected_data, ignore_index=True) + # Columns should be in the same order + return DataFrame.from_dict(expected_data) @staticmethod def transform_fake_fields(events: List[DictEvent]) -> None: diff --git a/tracetools_analysis/tracetools_analysis/data_model/__init__.py b/tracetools_analysis/tracetools_analysis/data_model/__init__.py index c5cbe29..4459da6 100644 --- a/tracetools_analysis/tracetools_analysis/data_model/__init__.py +++ b/tracetools_analysis/tracetools_analysis/data_model/__init__.py @@ -1,4 +1,5 @@ # Copyright 2019 Robert Bosch GmbH +# Copyright 2021 Christophe Bedard # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,17 +15,46 @@ """Base data model module.""" +from typing import Any +from typing import Dict +from typing import List + + +DataModelIntermediateStorage = List[Dict[str, Any]] + class DataModel(): """ Container with pre-processed data for an analysis to use. Contains data for an analysis to use. This is a middleground between trace events data and the - output data of an analysis. It uses pandas `DataFrame` directly. + output data of an analysis. + It uses native/simple Python data structures (e.g. lists of dicts) during processing, but + converts them to pandas `DataFrame` at the end. """ def __init__(self) -> None: - pass + self._finalized = False + + def finalize(self) -> None: + """ + Finalize the data model. + + Call this once data is done being generated or added to the model. + Finalization tasks are up to the inheriting/concrete class. + """ + # Avoid calling it twice for data models which might be shared + if not self._finalized: + self._finalized = True + self._finalize() + + def _finalize(self) -> None: + """ + Do the finalization. + + Only called once. + """ + raise NotImplementedError def print_data(self) -> None: """Print the data model.""" diff --git a/tracetools_analysis/tracetools_analysis/data_model/cpu_time.py b/tracetools_analysis/tracetools_analysis/data_model/cpu_time.py index fafcdbf..f255e16 100644 --- a/tracetools_analysis/tracetools_analysis/data_model/cpu_time.py +++ b/tracetools_analysis/tracetools_analysis/data_model/cpu_time.py @@ -1,4 +1,5 @@ # Copyright 2019 Robert Bosch GmbH +# Copyright 2021 Christophe Bedard # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,6 +18,7 @@ import pandas as pd from . import DataModel +from . import DataModelIntermediateStorage class CpuTimeDataModel(DataModel): @@ -29,12 +31,7 @@ class CpuTimeDataModel(DataModel): def __init__(self) -> None: """Create a CpuTimeDataModel.""" super().__init__() - self.times = pd.DataFrame(columns=[ - 'tid', - 'start_timestamp', - 'duration', - 'cpu_id', - ]) + self._times: DataModelIntermediateStorage = [] def add_duration( self, @@ -43,13 +40,15 @@ class CpuTimeDataModel(DataModel): duration: int, cpu_id: int, ) -> None: - data = { + self._times.append({ 'tid': tid, 'start_timestamp': start_timestamp, 'duration': duration, 'cpu_id': cpu_id, - } - self.times = self.times.append(data, ignore_index=True) + }) + + def _finalize(self) -> None: + self.times = pd.DataFrame.from_dict(self._times) def print_data(self) -> None: print('====================CPU TIME DATA MODEL====================') diff --git a/tracetools_analysis/tracetools_analysis/data_model/memory_usage.py b/tracetools_analysis/tracetools_analysis/data_model/memory_usage.py index 8eac7fc..0eabf8a 100644 --- a/tracetools_analysis/tracetools_analysis/data_model/memory_usage.py +++ b/tracetools_analysis/tracetools_analysis/data_model/memory_usage.py @@ -1,4 +1,5 @@ # Copyright 2019 Apex.AI, Inc. +# Copyright 2021 Christophe Bedard # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,6 +18,7 @@ import pandas as pd from . import DataModel +from . import DataModelIntermediateStorage class MemoryUsageDataModel(DataModel): @@ -30,11 +32,7 @@ class MemoryUsageDataModel(DataModel): def __init__(self) -> None: """Create a MemoryUsageDataModel.""" super().__init__() - self.memory_diff = pd.DataFrame(columns=[ - 'timestamp', - 'tid', - 'memory_diff', - ]) + self._memory_diff: DataModelIntermediateStorage = [] def add_memory_difference( self, @@ -42,12 +40,14 @@ class MemoryUsageDataModel(DataModel): tid: int, memory_diff: int, ) -> None: - data = { + self._memory_diff.append({ 'timestamp': timestamp, 'tid': tid, 'memory_diff': memory_diff, - } - self.memory_diff = self.memory_diff.append(data, ignore_index=True) + }) + + def _finalize(self) -> None: + self.memory_diff = pd.DataFrame.from_dict(self._memory_diff) def print_data(self) -> None: print('==================MEMORY USAGE DATA MODEL==================') diff --git a/tracetools_analysis/tracetools_analysis/data_model/profile.py b/tracetools_analysis/tracetools_analysis/data_model/profile.py index 0fcd1f8..0845647 100644 --- a/tracetools_analysis/tracetools_analysis/data_model/profile.py +++ b/tracetools_analysis/tracetools_analysis/data_model/profile.py @@ -1,4 +1,5 @@ # Copyright 2019 Robert Bosch GmbH +# Copyright 2021 Christophe Bedard # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,6 +20,7 @@ from typing import Optional import pandas as pd from . import DataModel +from . import DataModelIntermediateStorage class ProfileDataModel(DataModel): @@ -32,15 +34,7 @@ class ProfileDataModel(DataModel): def __init__(self) -> None: """Create a ProfileDataModel.""" super().__init__() - self.times = pd.DataFrame(columns=[ - 'tid', - 'depth', - 'function_name', - 'parent_name', - 'start_timestamp', - 'duration', - 'actual_duration', - ]) + self._times: DataModelIntermediateStorage = [] def add_duration( self, @@ -52,7 +46,7 @@ class ProfileDataModel(DataModel): duration: int, actual_duration: int, ) -> None: - data = { + self._times.append({ 'tid': tid, 'depth': depth, 'function_name': function_name, @@ -60,8 +54,10 @@ class ProfileDataModel(DataModel): 'start_timestamp': start_timestamp, 'duration': duration, 'actual_duration': actual_duration, - } - self.times = self.times.append(data, ignore_index=True) + }) + + def _finalize(self) -> None: + self.times = pd.DataFrame.from_dict(self._times) def print_data(self) -> None: print('====================PROFILE DATA MODEL====================') diff --git a/tracetools_analysis/tracetools_analysis/data_model/ros2.py b/tracetools_analysis/tracetools_analysis/data_model/ros2.py index 70a80b9..7c0799c 100644 --- a/tracetools_analysis/tracetools_analysis/data_model/ros2.py +++ b/tracetools_analysis/tracetools_analysis/data_model/ros2.py @@ -1,4 +1,5 @@ # Copyright 2019 Robert Bosch GmbH +# Copyright 2020-2021 Christophe Bedard # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,6 +18,7 @@ import pandas as pd from . import DataModel +from . import DataModelIntermediateStorage class Ros2DataModel(DataModel): @@ -30,129 +32,168 @@ class Ros2DataModel(DataModel): """Create a Ros2DataModel.""" super().__init__() # Objects (one-time events, usually when something is created) - self.contexts = pd.DataFrame(columns=['context_handle', - 'timestamp', - 'pid', - 'version']) - self.contexts.set_index(['context_handle'], inplace=True, drop=True) - self.nodes = pd.DataFrame(columns=['node_handle', - 'timestamp', - 'tid', - 'rmw_handle', - 'name', - 'namespace']) - self.nodes.set_index(['node_handle'], inplace=True, drop=True) - self.publishers = pd.DataFrame(columns=['publisher_handle', - 'timestamp', - 'node_handle', - 'rmw_handle', - 'topic_name', - 'depth']) - self.publishers.set_index(['publisher_handle'], inplace=True, drop=True) - self.subscriptions = pd.DataFrame(columns=['subscription_handle', - 'timestamp', - 'node_handle', - 'rmw_handle', - 'topic_name', - 'depth']) - self.subscriptions.set_index(['subscription_handle'], inplace=True, drop=True) - self.subscription_objects = pd.DataFrame(columns=['subscription', - 'timestamp', - 'subscription_handle']) - self.subscription_objects.set_index(['subscription'], inplace=True, drop=True) - self.services = pd.DataFrame(columns=['service_handle', - 'timestamp', - 'node_handle', - 'rmw_handle', - 'service_name']) - self.services.set_index(['service_handle'], inplace=True, drop=True) - self.clients = pd.DataFrame(columns=['client_handle', - 'timestamp', - 'node_handle', - 'rmw_handle', - 'service_name']) - self.clients.set_index(['client_handle'], inplace=True, drop=True) - self.timers = pd.DataFrame(columns=['timer_handle', - 'timestamp', - 'period', - 'tid']) - self.timers.set_index(['timer_handle'], inplace=True, drop=True) - - self.callback_objects = pd.DataFrame(columns=['reference', - 'timestamp', - 'callback_object']) - self.callback_objects.set_index(['reference'], inplace=True, drop=True) - self.callback_symbols = pd.DataFrame(columns=['callback_object', - 'timestamp', - 'symbol']) - self.callback_symbols.set_index(['callback_object'], inplace=True, drop=True) - + self._contexts: DataModelIntermediateStorage = [] + self._nodes: DataModelIntermediateStorage = [] + self._publishers: DataModelIntermediateStorage = [] + self._subscriptions: DataModelIntermediateStorage = [] + self._subscription_objects: DataModelIntermediateStorage = [] + self._services: DataModelIntermediateStorage = [] + self._clients: DataModelIntermediateStorage = [] + self._timers: DataModelIntermediateStorage = [] + self._callback_objects: DataModelIntermediateStorage = [] + self._callback_symbols: DataModelIntermediateStorage = [] # Events (multiple instances, may not have a meaningful index) - self.callback_instances = pd.DataFrame(columns=['callback_object', - 'timestamp', - 'duration', - 'intra_process']) + self._callback_instances: DataModelIntermediateStorage = [] def add_context( self, context_handle, timestamp, pid, version ) -> None: - self.contexts.loc[context_handle] = [timestamp, pid, version] + self._contexts.append({ + 'context_handle': context_handle, + 'timestamp': timestamp, + 'pid': pid, + 'version': version, + }) def add_node( self, node_handle, timestamp, tid, rmw_handle, name, namespace ) -> None: - self.nodes.loc[node_handle] = [timestamp, tid, rmw_handle, name, namespace] + self._nodes.append({ + 'node_handle': node_handle, + 'timestamp': timestamp, + 'tid': tid, + 'rmw_handle': rmw_handle, + 'name': name, + 'namespace': namespace, + }) def add_publisher( self, handle, timestamp, node_handle, rmw_handle, topic_name, depth ) -> None: - self.publishers.loc[handle] = [timestamp, node_handle, rmw_handle, topic_name, depth] + self._publishers.append({ + 'publisher_handle': handle, + 'timestamp': timestamp, + 'node_handle': node_handle, + 'rmw_handle': rmw_handle, + 'topic_name': topic_name, + 'depth': depth, + }) def add_rcl_subscription( self, handle, timestamp, node_handle, rmw_handle, topic_name, depth ) -> None: - self.subscriptions.loc[handle] = [timestamp, node_handle, rmw_handle, topic_name, depth] + self._subscriptions.append({ + 'subscription_handle': handle, + 'timestamp': timestamp, + 'node_handle': node_handle, + 'rmw_handle': rmw_handle, + 'topic_name': topic_name, + 'depth': depth, + }) def add_rclcpp_subscription( self, subscription_pointer, timestamp, subscription_handle ) -> None: - self.subscription_objects.loc[subscription_pointer] = [timestamp, subscription_handle] + self._subscription_objects.append({ + 'subscription': subscription_pointer, + 'timestamp': timestamp, + 'subscription_handle': subscription_handle, + }) def add_service( self, handle, timestamp, node_handle, rmw_handle, service_name ) -> None: - self.services.loc[handle] = [timestamp, node_handle, rmw_handle, service_name] + self._services.append({ + 'service_handle': timestamp, + 'timestamp': timestamp, + 'node_handle': node_handle, + 'rmw_handle': rmw_handle, + 'service_name': service_name, + }) def add_client( self, handle, timestamp, node_handle, rmw_handle, service_name ) -> None: - self.clients.loc[handle] = [timestamp, node_handle, rmw_handle, service_name] + self._clients.append({ + 'client_handle': handle, + 'timestamp': timestamp, + 'node_handle': node_handle, + 'rmw_handle': rmw_handle, + 'service_name': service_name, + }) def add_timer( self, handle, timestamp, period, tid ) -> None: - self.timers.loc[handle] = [timestamp, period, tid] + self._timers.append({ + 'timer_handle': handle, + 'timestamp': timestamp, + 'period': period, + 'tid': tid, + }) def add_callback_object( self, reference, timestamp, callback_object ) -> None: - self.callback_objects.loc[reference] = [timestamp, callback_object] + self._callback_objects.append({ + 'reference': reference, + 'timestamp': timestamp, + 'callback_object': callback_object, + }) def add_callback_symbol( self, callback_object, timestamp, symbol ) -> None: - self.callback_symbols.loc[callback_object] = [timestamp, symbol] + self._callback_symbols.append({ + 'callback_object': callback_object, + 'timestamp': timestamp, + 'symbol': symbol, + }) def add_callback_instance( self, callback_object, timestamp, duration, intra_process ) -> None: - data = { + self._callback_instances.append({ 'callback_object': callback_object, 'timestamp': timestamp, 'duration': duration, 'intra_process': intra_process, - } - self.callback_instances = self.callback_instances.append(data, ignore_index=True) + }) + + def _finalize(self) -> None: + # Some of the lists of dicts might be empty, and setting + # the index for an empty dataframe leads to an error + self.contexts = pd.DataFrame.from_dict(self._contexts) + if self._contexts: + self.contexts.set_index('context_handle', inplace=True, drop=True) + self.nodes = pd.DataFrame.from_dict(self._nodes) + if self._nodes: + self.nodes.set_index('node_handle', inplace=True, drop=True) + self.publishers = pd.DataFrame.from_dict(self._publishers) + if self._publishers: + self.publishers.set_index('publisher_handle', inplace=True, drop=True) + self.subscriptions = pd.DataFrame.from_dict(self._subscriptions) + if self._subscriptions: + self.subscriptions.set_index('subscription_handle', inplace=True, drop=True) + self.subscription_objects = pd.DataFrame.from_dict(self._subscription_objects) + if self._subscription_objects: + self.subscription_objects.set_index('subscription', inplace=True, drop=True) + self.services = pd.DataFrame.from_dict(self._services) + if self._services: + self.services.set_index('service_handle', inplace=True, drop=True) + self.clients = pd.DataFrame.from_dict(self._clients) + if self._clients: + self.clients.set_index('client_handle', inplace=True, drop=True) + self.timers = pd.DataFrame.from_dict(self._timers) + if self._timers: + self.timers.set_index('timer_handle', inplace=True, drop=True) + self.callback_objects = pd.DataFrame.from_dict(self._callback_objects) + if self._callback_objects: + self.callback_objects.set_index('reference', inplace=True, drop=True) + self.callback_symbols = pd.DataFrame.from_dict(self._callback_symbols) + if self._callback_symbols: + self.callback_symbols.set_index('callback_object', inplace=True, drop=True) + self.callback_instances = pd.DataFrame.from_dict(self._callback_instances) def print_data(self) -> None: print('====================ROS 2 DATA MODEL===================') diff --git a/tracetools_analysis/tracetools_analysis/processor/__init__.py b/tracetools_analysis/tracetools_analysis/processor/__init__.py index 380e970..6a95dd5 100644 --- a/tracetools_analysis/tracetools_analysis/processor/__init__.py +++ b/tracetools_analysis/tracetools_analysis/processor/__init__.py @@ -1,4 +1,5 @@ # Copyright 2019 Robert Bosch GmbH +# Copyright 2021 Christophe Bedard # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -196,6 +197,15 @@ class EventHandler(Dependant): processor.process(events) return handler_object + def finalize(self) -> None: + """ + Finalize the event handler. + + This should be called at the end, once all events have been processed. + """ + if self._data_model: + self._data_model.finalize() + class DependencySolver(): """ @@ -417,6 +427,7 @@ class Processor(): self._process_event(event) self._progress_display.did_work() self._progress_display.done(erase=erase_progress) + self._finalize_processing() self._processing_done = True def _process_event(self, event: DictEvent) -> None: @@ -450,6 +461,11 @@ class Processor(): metadata = EventMetadata(event_name, timestamp, cpu_id, procname, pid, tid) handler_function(event, metadata) + def _finalize_processing(self) -> None: + """Notify handlers that processing is done by calling corresponding method.""" + for handler in self._expanded_handlers: + handler.finalize() + def print_data(self) -> None: """Print processed data.""" if self._processing_done: