diff --git a/tracetools_analysis/analysis/data_model.py b/tracetools_analysis/analysis/data_model.py index f6a0296..f5565a8 100644 --- a/tracetools_analysis/analysis/data_model.py +++ b/tracetools_analysis/analysis/data_model.py @@ -22,17 +22,20 @@ class DataModel(): self._publishers.set_index(['publisher_handle'], inplace=True, drop=True) self._subscriptions = pd.DataFrame(columns=['subscription_handle', 'timestamp', 'node_handle', 'rmw_handle', 'topic_name', 'depth']) self._subscriptions.set_index(['subscription_handle'], inplace=True, drop=True) + self._subscription_callback_objects = pd.DataFrame(columns=['subscription_handle', 'timestamp', 'callback_object']) + self._subscription_callback_objects.set_index(['subscription_handle'], inplace=True, drop=True) + self._callbacks = pd.DataFrame(columns=['callback_object', 'timestamp', 'symbol']) + self._callbacks.set_index(['callback_object'], inplace=True, drop=True) self._services = pd.DataFrame(columns=[]) self._clients = pd.DataFrame(columns=[]) self._timers = pd.DataFrame(columns=[]) - # Events - # TODO + # Events (multiple instances, may not have a meaningful index) + self._subscription_callbacks = pd.DataFrame(columns=['callback_object', 'timestamp', 'duration', 'intra_process']) def add_context(self, context_handle, timestamp, pid): self._contexts.loc[context_handle] = [timestamp, pid] - # self._contexts = self._contexts.append({'context_handle': context_handle, 'timestamp': timestamp, 'pid': pid}, ignore_index=True) def add_node(self, node_handle, timestamp, tid, rmw_handle, name, namespace): self._nodes.loc[node_handle] = [timestamp, tid, rmw_handle, name, namespace] @@ -43,6 +46,15 @@ class DataModel(): def add_subscription(self, subscription_handle, timestamp, node_handle, rmw_handle, topic_name, depth): self._subscriptions.loc[subscription_handle] = [timestamp, node_handle, rmw_handle, topic_name, depth] + def add_subscription_callback_object(self, subscription_handle, timestamp, callback_object): + self._subscription_callback_objects.loc[subscription_handle] = [timestamp, callback_object] + + def add_callback(self, callback_object, timestamp, symbol): + self._callbacks.loc[callback_object] = [timestamp, symbol] + + def add_subscription_callback_instance(self, callback_object, timestamp, duration, intra_process): + self._subscription_callbacks = self._subscription_callbacks.append({'callback_object': callback_object, 'timestamp': timestamp, 'duration': duration, 'intra_process': intra_process}, ignore_index=True) + def print(self): """Debug method to print every contained df.""" print('====================DATA MODEL====================') @@ -52,11 +64,17 @@ class DataModel(): print() print(f'Publishers:\n{self._publishers.to_string()}') print() - print(f'Subscription:\n{self._subscriptions.to_string()}') + print(f'Subscriptions:\n{self._subscriptions.to_string()}') + print() + print(f'Subscription callbacks:\n{self._subscription_callback_objects.to_string()}') + print() + print(f'Subscription callback instances:\n{self._subscription_callbacks.to_string()}') print() print(f'Services:\n{self._services.to_string()}') print() print(f'Clients:\n{self._clients.to_string()}') print() print(f'Timers:\n{self._timers.to_string()}') + print() + print(f'Callbacks:\n{self._callbacks.to_string()}') print('==================================================') diff --git a/tracetools_analysis/analysis/processor.py b/tracetools_analysis/analysis/processor.py index d4a49d5..d8c0b46 100644 --- a/tracetools_analysis/analysis/processor.py +++ b/tracetools_analysis/analysis/processor.py @@ -65,6 +65,9 @@ class RosProcessor(EventHandler): super().__init__(handler_map) self._data = DataModel() + + # Temporary buffers + self._callback_instances = {} def get_data_model(self): return self._data @@ -97,26 +100,29 @@ class RosProcessor(EventHandler): self._data.add_subscription(sub_handle, metadata.timestamp, node_handle, rmw_handle, topic_name, depth) def _handle_rclcpp_subscription_callback_added(self, event, metadata): - # TODO - pass - # Add the callback address key and create an empty list - # callback_addr = get_field(event, 'callback') - # self.callbacks_instances[callback_addr] = [] + subscription_handle = get_field(event, 'subscription_handle') + callback_object = get_field(event, 'callback') + self._data.add_subscription_callback_object(subscription_handle, metadata.timestamp, callback_object) def _handle_rclcpp_subscription_callback_start(self, event, metadata): - # TODO - pass - # callback_addr = get_field(event, 'callback') - # self._callback_starts[callback_addr] = metadata.timestamp + # Add to dict + callback_addr = get_field(event, 'callback') + self._callback_instances[callback_addr] = (event, metadata) def _handle_rclcpp_subscription_callback_end(self, event, metadata): - # TODO - pass - # callback_addr = get_field(event, 'callback') - # start_timestamp = self._callback_starts.pop(callback_addr, None) - # if start_timestamp is not None: - # duration = metadata.timestamp - start_timestamp - # self.callbacks_instances[callback_addr].append((duration, start_timestamp)) + # Fetch from dict + callback_object = get_field(event, 'callback') + (event_start, metadata_start) = self._callback_instances.get(callback_object) + if event_start is not None and metadata_start is not None: + del self._callback_instances[callback_object] + duration = metadata.timestamp - metadata_start.timestamp + is_intra_process = get_field(event_start, 'is_intra_process') + self._data.add_subscription_callback_instance(callback_object, + metadata_start.timestamp, + duration, + bool(is_intra_process)) + else: + print(f'No matching callback start for callback object "{callback_object}"') def _handle_rcl_service_init(self, event, metadata): # TODO @@ -155,5 +161,6 @@ class RosProcessor(EventHandler): pass def _handle_rclcpp_callback_register(self, event, metadata): - # TODO - pass + callback_object = get_field(event, 'callback') + symbol = get_field(event, 'symbol') + self._data.add_callback(callback_object, metadata.timestamp, symbol)