diff --git a/tracetools_analysis/tracetools_analysis/data_model/ros2.py b/tracetools_analysis/tracetools_analysis/data_model/ros2.py index 8fe8462..c26c053 100644 --- a/tracetools_analysis/tracetools_analysis/data_model/ros2.py +++ b/tracetools_analysis/tracetools_analysis/data_model/ros2.py @@ -46,6 +46,9 @@ class Ros2DataModel(DataModel): self._callback_symbols: DataModelIntermediateStorage = [] self._lifecycle_state_machines: DataModelIntermediateStorage = [] # Events (multiple instances, may not have a meaningful index) + self._rclcpp_publish_instances: DataModelIntermediateStorage = [] + self._rcl_publish_instances: DataModelIntermediateStorage = [] + self._rmw_publish_instances: DataModelIntermediateStorage = [] self._callback_instances: DataModelIntermediateStorage = [] self._lifecycle_transitions: DataModelIntermediateStorage = [] @@ -83,6 +86,31 @@ class Ros2DataModel(DataModel): 'depth': depth, }) + def add_rclcpp_publish_instance( + self, timestamp, message, + ) -> None: + self._rclcpp_publish_instances.append({ + 'timestamp': timestamp, + 'message': message, + }) + + def add_rcl_publish_instance( + self, publisher_handle, timestamp, message, + ) -> None: + self._rcl_publish_instances.append({ + 'publisher_handle': publisher_handle, + 'timestamp': timestamp, + 'message': message, + }) + + def add_rmw_publish_instance( + self, timestamp, message, + ) -> None: + self._rmw_publish_instances.append({ + 'timestamp': timestamp, + 'message': message, + }) + def add_rcl_subscription( self, handle, timestamp, node_handle, rmw_handle, topic_name, depth ) -> None: @@ -231,6 +259,9 @@ class Ros2DataModel(DataModel): if self._lifecycle_state_machines: self.lifecycle_state_machines.set_index( 'state_machine_handle', inplace=True, drop=True) + self.rclcpp_publish_instances = pd.DataFrame.from_dict(self._rclcpp_publish_instances) + self.rcl_publish_instances = pd.DataFrame.from_dict(self._rcl_publish_instances) + self.rmw_publish_instances = pd.DataFrame.from_dict(self._rmw_publish_instances) self.callback_instances = pd.DataFrame.from_dict(self._callback_instances) self.lifecycle_transitions = pd.DataFrame.from_dict(self._lifecycle_transitions) @@ -245,6 +276,15 @@ class Ros2DataModel(DataModel): print('Publishers:') print(self.publishers.to_string()) print() + print('Publish instances (rclcpp):') + print(self.rclcpp_publish_instances.to_string()) + print() + print('Publish instances (rcl):') + print(self.rcl_publish_instances.to_string()) + print() + print('Publish instances (rmw):') + print(self.rmw_publish_instances.to_string()) + print() print('Subscriptions:') print(self.subscriptions.to_string()) print() diff --git a/tracetools_analysis/tracetools_analysis/processor/ros2.py b/tracetools_analysis/tracetools_analysis/processor/ros2.py index a37731c..9ce9ed1 100644 --- a/tracetools_analysis/tracetools_analysis/processor/ros2.py +++ b/tracetools_analysis/tracetools_analysis/processor/ros2.py @@ -47,6 +47,12 @@ class Ros2Handler(EventHandler): self._handle_rcl_node_init, 'ros2:rcl_publisher_init': self._handle_rcl_publisher_init, + 'ros2:rclcpp_publish': + self._handle_rclcpp_publish, + 'ros2:rcl_publish': + self._handle_rcl_publish, + 'ros2:rmw_publish': + self._handle_rmw_publish, 'ros2:rcl_subscription_init': self._handle_rcl_subscription_init, 'ros2:rclcpp_subscription_init': @@ -126,6 +132,28 @@ class Ros2Handler(EventHandler): depth = get_field(event, 'queue_depth') self.data.add_publisher(handle, timestamp, node_handle, rmw_handle, topic_name, depth) + def _handle_rclcpp_publish( + self, event: Dict, metadata: EventMetadata, + ) -> None: + timestamp = metadata.timestamp + message = get_field(event, 'message') + self.data.add_rclcpp_publish_instance(timestamp, message) + + def _handle_rcl_publish( + self, event: Dict, metadata: EventMetadata, + ) -> None: + handle = get_field(event, 'publisher_handle') + timestamp = metadata.timestamp + message = get_field(event, 'message') + self.data.add_rcl_publish_instance(handle, timestamp, message) + + def _handle_rmw_publish( + self, event: Dict, metadata: EventMetadata, + ) -> None: + timestamp = metadata.timestamp + message = get_field(event, 'message') + self.data.add_rmw_publish_instance(timestamp, message) + def _handle_rcl_subscription_init( self, event: Dict, metadata: EventMetadata, ) -> None: diff --git a/tracetools_analysis/tracetools_analysis/utils/ros2.py b/tracetools_analysis/tracetools_analysis/utils/ros2.py index ad868e8..b650737 100644 --- a/tracetools_analysis/tracetools_analysis/utils/ros2.py +++ b/tracetools_analysis/tracetools_analysis/utils/ros2.py @@ -127,6 +127,56 @@ class Ros2DataModelUtil(DataModelUtil): """Get a list of thread ids corresponding to the nodes.""" return self.data.nodes['tid'].unique().tolist() + def get_rcl_publish_instances(self, topic_name) -> Optional[DataFrame]: + """ + Get rcl publish instances for all publishers with the given topic name. + + :param topic_name: the topic name + :return: dataframe with [publisher handle, publish timestamp, message] columns, + or `None` if topic name not found + """ + # We could have more than one publisher for the topic + publisher_handles = self.data.publishers.loc[ + self.data.publishers['topic_name'] == topic_name + ].index.values.astype(int) + if len(publisher_handles) == 0: + return None + publish_instances = self.data.rcl_publish_instances.loc[ + self.data.rcl_publish_instances['publisher_handle'].isin(publisher_handles) + ] + publish_instances.reset_index(drop=True, inplace=True) + self.convert_time_columns(publish_instances, [], ['timestamp'], True) + return publish_instances + + def get_publish_instances(self) -> DataFrame: + """ + Get all publish instances (rclcpp, rcl, rmw) in a single dataframe. + + The rows are ordered by publish timestamp, so the order will usually be: rclcpp, rcl, rmw. + However, this does not apply to publications from internal publishers, i.e., + publications that originate from below rclcpp (rcl or rmw). + TODO(christophebedard) find heuristic to exclude those + + :return: dataframe with [timestamp, message, layer 'rclcpp'|'rcl'|'rmw', publisher handle] + columns, ordered by timestamp, + and where the publisher handle is only set (non-zero) for 'rcl' publish instances + """ + # Add publisher handle columns with zeros for dataframes that do not have this column, + # otherwise NaN is used and the publisher handle values for rcl are converted to float + rclcpp_instances = self.data.rclcpp_publish_instances.copy() + rclcpp_instances['layer'] = 'rclcpp' + rclcpp_instances['publisher_handle'] = 0 + rcl_instances = self.data.rcl_publish_instances.copy() + rcl_instances['layer'] = 'rcl' + rmw_instances = self.data.rmw_publish_instances.copy() + rmw_instances['layer'] = 'rmw' + rmw_instances['publisher_handle'] = 0 + publish_instances = concat([rclcpp_instances, rcl_instances, rmw_instances], axis=0) + publish_instances.sort_values('timestamp', inplace=True) + publish_instances.reset_index(drop=True, inplace=True) + self.convert_time_columns(publish_instances, [], ['timestamp'], True) + return publish_instances + def get_callback_durations( self, callback_obj: int,