Use lists of dicts as intermediate storage & convert to df at the end

Backport of 3431b4b7b2

Signed-off-by: Christophe Bedard <bedard.christophe@gmail.com>
This commit is contained in:
Christophe Bedard 2021-03-31 19:25:02 -04:00
parent 56ad1162a2
commit 61a2783a4d
6 changed files with 202 additions and 104 deletions

View file

@ -1,4 +1,5 @@
# Copyright 2019 Robert Bosch GmbH
# Copyright 2021 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -20,11 +21,33 @@ class DataModel():
Container with pre-processed data for an analysis to use.
Contains data for an analysis to use. This is a middleground between trace events data and the
output data of an analysis. It uses pandas `DataFrame` directly.
output data of an analysis.
It uses native/simple Python data structures (e.g. lists of dicts) during processing, but
converts them to pandas `DataFrame` at the end.
"""
def __init__(self) -> None:
pass
self._finalized = False
def finalize(self) -> None:
"""
Finalize the data model.
Call this once data is done being generated or added to the model.
Finalization tasks are up to the inheriting/concrete class.
"""
# Avoid calling it twice for data models which might be shared
if not self._finalized:
self._finalized = True
self._finalize()
def _finalize(self) -> None:
"""
Do the finalization.
Only called once.
"""
raise NotImplementedError
def print_data(self) -> None:
"""Print the data model."""

View file

@ -1,4 +1,5 @@
# Copyright 2019 Robert Bosch GmbH
# Copyright 2021 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -29,12 +30,10 @@ class CpuTimeDataModel(DataModel):
def __init__(self) -> None:
"""Create a CpuTimeDataModel."""
super().__init__()
self.times = pd.DataFrame(columns=[
'tid',
'start_timestamp',
'duration',
'cpu_id',
])
# Intermediate
self._times = []
# Final
self.times = None
def add_duration(
self,
@ -43,13 +42,15 @@ class CpuTimeDataModel(DataModel):
duration: int,
cpu_id: int,
) -> None:
data = {
self._times.append({
'tid': tid,
'start_timestamp': start_timestamp,
'duration': duration,
'cpu_id': cpu_id,
}
self.times = self.times.append(data, ignore_index=True)
})
def _finalize(self) -> None:
self.times = pd.DataFrame.from_dict(self._times)
def print_data(self) -> None:
print('====================CPU TIME DATA MODEL====================')

View file

@ -1,4 +1,5 @@
# Copyright 2019 Apex.AI, Inc.
# Copyright 2021 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -30,11 +31,10 @@ class MemoryUsageDataModel(DataModel):
def __init__(self) -> None:
"""Create a MemoryUsageDataModel."""
super().__init__()
self.memory_diff = pd.DataFrame(columns=[
'timestamp',
'tid',
'memory_diff',
])
# Intermediate
self._memory_diff = []
# Final
self.memory_diff = None
def add_memory_difference(
self,
@ -42,12 +42,14 @@ class MemoryUsageDataModel(DataModel):
tid: int,
memory_diff: int,
) -> None:
data = {
self._memory_diff.append({
'timestamp': timestamp,
'tid': tid,
'memory_diff': memory_diff,
}
self.memory_diff = self.memory_diff.append(data, ignore_index=True)
})
def _finalize(self) -> None:
self.memory_diff = pd.DataFrame.from_dict(self._memory_diff)
def print_data(self) -> None:
print('==================MEMORY USAGE DATA MODEL==================')

View file

@ -1,4 +1,5 @@
# Copyright 2019 Robert Bosch GmbH
# Copyright 2021 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -32,15 +33,10 @@ class ProfileDataModel(DataModel):
def __init__(self) -> None:
"""Create a ProfileDataModel."""
super().__init__()
self.times = pd.DataFrame(columns=[
'tid',
'depth',
'function_name',
'parent_name',
'start_timestamp',
'duration',
'actual_duration',
])
# Intermediate
self._times = []
# Final
self.times = None
def add_duration(
self,
@ -52,7 +48,7 @@ class ProfileDataModel(DataModel):
duration: int,
actual_duration: int,
) -> None:
data = {
self._times.append({
'tid': tid,
'depth': depth,
'function_name': function_name,
@ -60,8 +56,10 @@ class ProfileDataModel(DataModel):
'start_timestamp': start_timestamp,
'duration': duration,
'actual_duration': actual_duration,
}
self.times = self.times.append(data, ignore_index=True)
})
def _finalize(self) -> None:
self.times = pd.DataFrame.from_dict(self._times)
def print_data(self) -> None:
print('====================PROFILE DATA MODEL====================')

View file

@ -1,4 +1,5 @@
# Copyright 2019 Robert Bosch GmbH
# Copyright 2020-2021 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -29,130 +30,187 @@ class Ros2DataModel(DataModel):
def __init__(self) -> None:
"""Create a Ros2DataModel."""
super().__init__()
# Objects (one-time events, usually when something is created)
self.contexts = pd.DataFrame(columns=['context_handle',
'timestamp',
'pid',
'version'])
self.contexts.set_index(['context_handle'], inplace=True, drop=True)
self.nodes = pd.DataFrame(columns=['node_handle',
'timestamp',
'tid',
'rmw_handle',
'name',
'namespace'])
self.nodes.set_index(['node_handle'], inplace=True, drop=True)
self.publishers = pd.DataFrame(columns=['publisher_handle',
'timestamp',
'node_handle',
'rmw_handle',
'topic_name',
'depth'])
self.publishers.set_index(['publisher_handle'], inplace=True, drop=True)
self.subscriptions = pd.DataFrame(columns=['subscription_handle',
'timestamp',
'node_handle',
'rmw_handle',
'topic_name',
'depth'])
self.subscriptions.set_index(['subscription_handle'], inplace=True, drop=True)
self.subscription_objects = pd.DataFrame(columns=['subscription',
'timestamp',
'subscription_handle'])
self.subscription_objects.set_index(['subscription'], inplace=True, drop=True)
self.services = pd.DataFrame(columns=['service_handle',
'timestamp',
'node_handle',
'rmw_handle',
'service_name'])
self.services.set_index(['service_handle'], inplace=True, drop=True)
self.clients = pd.DataFrame(columns=['client_handle',
'timestamp',
'node_handle',
'rmw_handle',
'service_name'])
self.clients.set_index(['client_handle'], inplace=True, drop=True)
self.timers = pd.DataFrame(columns=['timer_handle',
'timestamp',
'period',
'tid'])
self.timers.set_index(['timer_handle'], inplace=True, drop=True)
self.callback_objects = pd.DataFrame(columns=['reference',
'timestamp',
'callback_object'])
self.callback_objects.set_index(['reference'], inplace=True, drop=True)
self.callback_symbols = pd.DataFrame(columns=['callback_object',
'timestamp',
'symbol'])
self.callback_symbols.set_index(['callback_object'], inplace=True, drop=True)
# Intermediate
self._contexts = []
self._nodes = []
self._publishers = []
self._subscriptions = []
self._subscription_objects = []
self._services = []
self._clients = []
self._timers = []
self._callback_objects = []
self._callback_symbols = []
self._callback_instances = []
# Final
# Objects (one-time events, usually when something is created)
self.contexts = None
self.nodes = None
self.publishers = None
self.subscriptions = None
self.subscription_objects = None
self.services = None
self.clients = None
self.timers = None
self.callback_objects = None
self.callback_symbols = None
# Events (multiple instances, may not have a meaningful index)
self.callback_instances = pd.DataFrame(columns=['callback_object',
'timestamp',
'duration',
'intra_process'])
self.callback_instances = None
def add_context(
self, context_handle, timestamp, pid, version
) -> None:
self.contexts.loc[context_handle] = [timestamp, pid, version]
self._contexts.append({
'context_handle': context_handle,
'timestamp': timestamp,
'pid': pid,
'version': version,
})
def add_node(
self, node_handle, timestamp, tid, rmw_handle, name, namespace
) -> None:
self.nodes.loc[node_handle] = [timestamp, tid, rmw_handle, name, namespace]
self._nodes.append({
'node_handle': node_handle,
'timestamp': timestamp,
'tid': tid,
'rmw_handle': rmw_handle,
'name': name,
'namespace': namespace,
})
def add_publisher(
self, handle, timestamp, node_handle, rmw_handle, topic_name, depth
) -> None:
self.publishers.loc[handle] = [timestamp, node_handle, rmw_handle, topic_name, depth]
self._publishers.append({
'publisher_handle': handle,
'timestamp': timestamp,
'node_handle': node_handle,
'rmw_handle': rmw_handle,
'topic_name': topic_name,
'depth': depth,
})
def add_rcl_subscription(
self, handle, timestamp, node_handle, rmw_handle, topic_name, depth
) -> None:
self.subscriptions.loc[handle] = [timestamp, node_handle, rmw_handle, topic_name, depth]
self._subscriptions.append({
'subscription_handle': handle,
'timestamp': timestamp,
'node_handle': node_handle,
'rmw_handle': rmw_handle,
'topic_name': topic_name,
'depth': depth,
})
def add_rclcpp_subscription(
self, subscription_pointer, timestamp, subscription_handle
) -> None:
self.subscription_objects.loc[subscription_pointer] = [timestamp, subscription_handle]
self._subscription_objects.append({
'subscription': subscription_pointer,
'timestamp': timestamp,
'subscription_handle': subscription_handle,
})
def add_service(
self, handle, timestamp, node_handle, rmw_handle, service_name
) -> None:
self.services.loc[handle] = [timestamp, node_handle, rmw_handle, service_name]
self._services.append({
'service_handle': timestamp,
'timestamp': timestamp,
'node_handle': node_handle,
'rmw_handle': rmw_handle,
'service_name': service_name,
})
def add_client(
self, handle, timestamp, node_handle, rmw_handle, service_name
) -> None:
self.clients.loc[handle] = [timestamp, node_handle, rmw_handle, service_name]
self._clients.append({
'client_handle': handle,
'timestamp': timestamp,
'node_handle': node_handle,
'rmw_handle': rmw_handle,
'service_name': service_name,
})
def add_timer(
self, handle, timestamp, period, tid
) -> None:
self.timers.loc[handle] = [timestamp, period, tid]
self._timers.append({
'timer_handle': handle,
'timestamp': timestamp,
'period': period,
'tid': tid,
})
def add_callback_object(
self, reference, timestamp, callback_object
) -> None:
self.callback_objects.loc[reference] = [timestamp, callback_object]
self._callback_objects.append({
'reference': reference,
'timestamp': timestamp,
'callback_object': callback_object,
})
def add_callback_symbol(
self, callback_object, timestamp, symbol
) -> None:
self.callback_symbols.loc[callback_object] = [timestamp, symbol]
self._callback_symbols.append({
'callback_object': callback_object,
'timestamp': timestamp,
'symbol': symbol,
})
def add_callback_instance(
self, callback_object, timestamp, duration, intra_process
) -> None:
data = {
self._callback_instances.append({
'callback_object': callback_object,
'timestamp': timestamp,
'duration': duration,
'intra_process': intra_process,
}
self.callback_instances = self.callback_instances.append(data, ignore_index=True)
})
def _finalize(self) -> None:
# Some of the lists of dicts might be empty, and setting
# the index for an empty dataframe leads to an error
self.contexts = pd.DataFrame.from_dict(self._contexts)
if self._contexts:
self.contexts.set_index('context_handle', inplace=True, drop=True)
self.nodes = pd.DataFrame.from_dict(self._nodes)
if self._nodes:
self.nodes.set_index('node_handle', inplace=True, drop=True)
self.publishers = pd.DataFrame.from_dict(self._publishers)
if self._publishers:
self.publishers.set_index('publisher_handle', inplace=True, drop=True)
self.subscriptions = pd.DataFrame.from_dict(self._subscriptions)
if self._subscriptions:
self.subscriptions.set_index('subscription_handle', inplace=True, drop=True)
self.subscription_objects = pd.DataFrame.from_dict(self._subscription_objects)
if self._subscription_objects:
self.subscription_objects.set_index('subscription', inplace=True, drop=True)
self.services = pd.DataFrame.from_dict(self._services)
if self._services:
self.services.set_index('service_handle', inplace=True, drop=True)
self.clients = pd.DataFrame.from_dict(self._clients)
if self._clients:
self.clients.set_index('client_handle', inplace=True, drop=True)
self.timers = pd.DataFrame.from_dict(self._timers)
if self._timers:
self.timers.set_index('timer_handle', inplace=True, drop=True)
self.callback_objects = pd.DataFrame.from_dict(self._callback_objects)
if self._callback_objects:
self.callback_objects.set_index('reference', inplace=True, drop=True)
self.callback_symbols = pd.DataFrame.from_dict(self._callback_symbols)
if self._callback_symbols:
self.callback_symbols.set_index('callback_object', inplace=True, drop=True)
# Events (multiple instances, may not have a meaningful index)
self.callback_instances = pd.DataFrame.from_dict(self._callback_instances)
def print_data(self) -> None:
print('====================ROS 2 DATA MODEL===================')

View file

@ -1,4 +1,5 @@
# Copyright 2019 Robert Bosch GmbH
# Copyright 2021 Christophe Bedard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -196,6 +197,15 @@ class EventHandler(Dependant):
processor.process(events)
return handler_object
def finalize(self) -> None:
"""
Finalize the event handler.
This should be called at the end, once all events have been processed.
"""
if self._data_model:
self._data_model.finalize()
class DependencySolver():
"""
@ -417,6 +427,7 @@ class Processor():
self._process_event(event)
self._progress_display.did_work()
self._progress_display.done(erase=erase_progress)
self._finalize_processing()
self._processing_done = True
def _process_event(self, event: DictEvent) -> None:
@ -450,6 +461,11 @@ class Processor():
metadata = EventMetadata(event_name, timestamp, cpu_id, procname, pid, tid)
handler_function(event, metadata)
def _finalize_processing(self) -> None:
"""Notify handlers that processing is done by calling corresponding method."""
for handler in self._expanded_handlers:
handler.finalize()
def print_data(self) -> None:
"""Print processed data."""
if self._processing_done: