diff --git a/tracetools_analysis/tracetools_analysis/data_model/ros2.py b/tracetools_analysis/tracetools_analysis/data_model/ros2.py index c26c053..65f1fab 100644 --- a/tracetools_analysis/tracetools_analysis/data_model/ros2.py +++ b/tracetools_analysis/tracetools_analysis/data_model/ros2.py @@ -35,8 +35,10 @@ class Ros2DataModel(DataModel): # Objects (one-time events, usually when something is created) self._contexts: DataModelIntermediateStorage = [] self._nodes: DataModelIntermediateStorage = [] - self._publishers: DataModelIntermediateStorage = [] - self._subscriptions: DataModelIntermediateStorage = [] + self._rmw_publishers: DataModelIntermediateStorage = [] + self._rcl_publishers: DataModelIntermediateStorage = [] + self._rmw_subscriptions: DataModelIntermediateStorage = [] + self._rcl_subscriptions: DataModelIntermediateStorage = [] self._subscription_objects: DataModelIntermediateStorage = [] self._services: DataModelIntermediateStorage = [] self._clients: DataModelIntermediateStorage = [] @@ -49,6 +51,9 @@ class Ros2DataModel(DataModel): self._rclcpp_publish_instances: DataModelIntermediateStorage = [] self._rcl_publish_instances: DataModelIntermediateStorage = [] self._rmw_publish_instances: DataModelIntermediateStorage = [] + self._rmw_take_instances: DataModelIntermediateStorage = [] + self._rcl_take_instances: DataModelIntermediateStorage = [] + self._rclcpp_take_instances: DataModelIntermediateStorage = [] self._callback_instances: DataModelIntermediateStorage = [] self._lifecycle_transitions: DataModelIntermediateStorage = [] @@ -74,10 +79,19 @@ class Ros2DataModel(DataModel): 'namespace': namespace, }) - def add_publisher( + def add_rmw_publisher( + self, handle, timestamp, gid, + ) -> None: + self._rmw_publishers.append({ + 'publisher_handle': handle, + 'timestamp': timestamp, + 'gid': gid, + }) + + def add_rcl_publisher( self, handle, timestamp, node_handle, rmw_handle, topic_name, depth ) -> None: - self._publishers.append({ + self._rcl_publishers.append({ 'publisher_handle': handle, 'timestamp': timestamp, 'node_handle': node_handle, @@ -111,10 +125,19 @@ class Ros2DataModel(DataModel): 'message': message, }) + def add_rmw_subscription( + self, handle, timestamp, gid + ) -> None: + self._rcl_subscriptions.append({ + 'subscription_handle': handle, + 'timestamp': timestamp, + 'gid': gid, + }) + def add_rcl_subscription( self, handle, timestamp, node_handle, rmw_handle, topic_name, depth ) -> None: - self._subscriptions.append({ + self._rcl_subscriptions.append({ 'subscription_handle': handle, 'timestamp': timestamp, 'node_handle': node_handle, @@ -201,6 +224,33 @@ class Ros2DataModel(DataModel): 'intra_process': intra_process, }) + def add_rmw_take_instance( + self, subscription_handle, timestamp, message, source_timestamp, taken + ) -> None: + self._rmw_take_instances.append({ + 'subscription_handle': subscription_handle, + 'timestamp': timestamp, + 'message': message, + 'source_timestamp': source_timestamp, + 'taken': taken, + }) + + def add_rcl_take_instance( + self, timestamp, message + ) -> None: + self._rcl_take_instances.append({ + 'timestamp': timestamp, + 'message': message, + }) + + def add_rclcpp_take_instance( + self, timestamp, message + ) -> None: + self._rclcpp_take_instances.append({ + 'timestamp': timestamp, + 'message': message, + }) + def add_lifecycle_state_machine( self, handle, node_handle ) -> None: @@ -228,12 +278,18 @@ class Ros2DataModel(DataModel): self.nodes = pd.DataFrame.from_dict(self._nodes) if self._nodes: self.nodes.set_index('node_handle', inplace=True, drop=True) - self.publishers = pd.DataFrame.from_dict(self._publishers) - if self._publishers: - self.publishers.set_index('publisher_handle', inplace=True, drop=True) - self.subscriptions = pd.DataFrame.from_dict(self._subscriptions) - if self._subscriptions: - self.subscriptions.set_index('subscription_handle', inplace=True, drop=True) + self.rmw_publishers = pd.DataFrame.from_dict(self._rmw_publishers) + if self._rmw_publishers: + self.rmw_publishers.set_index('publisher_handle', inplace=True, drop=True) + self.rcl_publishers = pd.DataFrame.from_dict(self._rcl_publishers) + if self._rcl_publishers: + self.rcl_publishers.set_index('publisher_handle', inplace=True, drop=True) + self.rmw_subscriptions = pd.DataFrame.from_dict(self._rmw_subscriptions) + if self._rmw_subscriptions: + self.rmw_subscriptions.set_index('subscription_handle', inplace=True, drop=True) + self.rcl_subscriptions = pd.DataFrame.from_dict(self._rcl_subscriptions) + if self._rcl_subscriptions: + self.rcl_subscriptions.set_index('subscription_handle', inplace=True, drop=True) self.subscription_objects = pd.DataFrame.from_dict(self._subscription_objects) if self._subscription_objects: self.subscription_objects.set_index('subscription', inplace=True, drop=True) @@ -262,6 +318,9 @@ class Ros2DataModel(DataModel): self.rclcpp_publish_instances = pd.DataFrame.from_dict(self._rclcpp_publish_instances) self.rcl_publish_instances = pd.DataFrame.from_dict(self._rcl_publish_instances) self.rmw_publish_instances = pd.DataFrame.from_dict(self._rmw_publish_instances) + self.rmw_take_instances = pd.DataFrame.from_dict(self._rmw_take_instances) + self.rcl_take_instances = pd.DataFrame.from_dict(self._rcl_take_instances) + self.rclcpp_take_instances = pd.DataFrame.from_dict(self._rclcpp_take_instances) self.callback_instances = pd.DataFrame.from_dict(self._callback_instances) self.lifecycle_transitions = pd.DataFrame.from_dict(self._lifecycle_transitions) @@ -273,20 +332,17 @@ class Ros2DataModel(DataModel): print('Nodes:') print(self.nodes.to_string()) print() - print('Publishers:') - print(self.publishers.to_string()) + print('Publishers (rmw):') + print(self.rmw_publishers.to_string()) print() - print('Publish instances (rclcpp):') - print(self.rclcpp_publish_instances.to_string()) + print('Publishers (rcl):') + print(self.rcl_publishers.to_string()) print() - print('Publish instances (rcl):') - print(self.rcl_publish_instances.to_string()) + print('Subscriptions (rmw):') + print(self.rmw_subscriptions.to_string()) print() - print('Publish instances (rmw):') - print(self.rmw_publish_instances.to_string()) - print() - print('Subscriptions:') - print(self.subscriptions.to_string()) + print('Subscriptions (rcl):') + print(self.rcl_subscriptions.to_string()) print() print('Subscription objects:') print(self.subscription_objects.to_string()) @@ -312,6 +368,24 @@ class Ros2DataModel(DataModel): print('Callback instances:') print(self.callback_instances.to_string()) print() + print('Publish instances (rclcpp):') + print(self.rclcpp_publish_instances.to_string()) + print() + print('Publish instances (rcl):') + print(self.rcl_publish_instances.to_string()) + print() + print('Publish instances (rmw):') + print(self.rmw_publish_instances.to_string()) + print() + print('Take instances (rmw):') + print(self.rmw_take_instances.to_string()) + print() + print('Take instances (rcl):') + print(self.rcl_take_instances.to_string()) + print() + print('Take instances (rclcpp):') + print(self.rclcpp_take_instances.to_string()) + print() print('Lifecycle state machines:') print(self.lifecycle_state_machines.to_string()) print() diff --git a/tracetools_analysis/tracetools_analysis/processor/ros2.py b/tracetools_analysis/tracetools_analysis/processor/ros2.py index 9ce9ed1..8e69965 100644 --- a/tracetools_analysis/tracetools_analysis/processor/ros2.py +++ b/tracetools_analysis/tracetools_analysis/processor/ros2.py @@ -45,6 +45,8 @@ class Ros2Handler(EventHandler): self._handle_rcl_init, 'ros2:rcl_node_init': self._handle_rcl_node_init, + 'ros2:rmw_publisher_init': + self._handle_rmw_publisher_init, 'ros2:rcl_publisher_init': self._handle_rcl_publisher_init, 'ros2:rclcpp_publish': @@ -53,12 +55,20 @@ class Ros2Handler(EventHandler): self._handle_rcl_publish, 'ros2:rmw_publish': self._handle_rmw_publish, + 'ros2:rmw_subscription_init': + self._handle_rmw_subscription_init, 'ros2:rcl_subscription_init': self._handle_rcl_subscription_init, 'ros2:rclcpp_subscription_init': self._handle_rclcpp_subscription_init, 'ros2:rclcpp_subscription_callback_added': self._handle_rclcpp_subscription_callback_added, + 'ros2:rmw_take': + self._handle_rmw_take, + 'ros2:rcl_take': + self._handle_rcl_take, + 'ros2:rclcpp_take': + self._handle_rclcpp_take, 'ros2:rcl_service_init': self._handle_rcl_service_init, 'ros2:rclcpp_service_callback_added': @@ -121,6 +131,14 @@ class Ros2Handler(EventHandler): namespace = get_field(event, 'namespace') self.data.add_node(handle, timestamp, tid, rmw_handle, name, namespace) + def _handle_rmw_publisher_init( + self, event: Dict, metadata: EventMetadata, + ) -> None: + handle = get_field(event, 'rmw_publisher_handle') + timestamp = metadata.timestamp + gid = get_field(event, 'gid') + self.data.add_rmw_publisher(handle, timestamp, gid) + def _handle_rcl_publisher_init( self, event: Dict, metadata: EventMetadata, ) -> None: @@ -130,7 +148,7 @@ class Ros2Handler(EventHandler): rmw_handle = get_field(event, 'rmw_publisher_handle') topic_name = get_field(event, 'topic_name') depth = get_field(event, 'queue_depth') - self.data.add_publisher(handle, timestamp, node_handle, rmw_handle, topic_name, depth) + self.data.add_rcl_publisher(handle, timestamp, node_handle, rmw_handle, topic_name, depth) def _handle_rclcpp_publish( self, event: Dict, metadata: EventMetadata, @@ -154,6 +172,14 @@ class Ros2Handler(EventHandler): message = get_field(event, 'message') self.data.add_rmw_publish_instance(timestamp, message) + def _handle_rmw_subscription_init( + self, event: Dict, metadata: EventMetadata, + ) -> None: + handle = get_field(event, 'rmw_subscription_handle') + timestamp = metadata.timestamp + gid = get_field(event, 'gid') + self.data.add_rmw_subscription(handle, timestamp, gid) + def _handle_rcl_subscription_init( self, event: Dict, metadata: EventMetadata, ) -> None: @@ -183,6 +209,32 @@ class Ros2Handler(EventHandler): callback_object = get_field(event, 'callback') self.data.add_callback_object(subscription_pointer, timestamp, callback_object) + def _handle_rmw_take( + self, event: Dict, metadata: EventMetadata, + ) -> None: + subscription_handle = get_field(event, 'rmw_subscription_handle') + timestamp = metadata.timestamp + message = get_field(event, 'message') + source_timestamp = get_field(event, 'source_timestamp') + taken = bool(get_field(event, 'taken')) + self.data.add_rmw_take_instance( + subscription_handle, timestamp, message, source_timestamp, taken + ) + + def _handle_rcl_take( + self, event: Dict, metadata: EventMetadata, + ) -> None: + timestamp = metadata.timestamp + message = get_field(event, 'message') + self.data.add_rcl_take_instance(timestamp, message) + + def _handle_rclcpp_take( + self, event: Dict, metadata: EventMetadata, + ) -> None: + timestamp = metadata.timestamp + message = get_field(event, 'message') + self.data.add_rclcpp_take_instance(timestamp, message) + def _handle_rcl_service_init( self, event: Dict, metadata: EventMetadata, ) -> None: diff --git a/tracetools_analysis/tracetools_analysis/utils/ros2.py b/tracetools_analysis/tracetools_analysis/utils/ros2.py index b650737..13f69f7 100644 --- a/tracetools_analysis/tracetools_analysis/utils/ros2.py +++ b/tracetools_analysis/tracetools_analysis/utils/ros2.py @@ -136,8 +136,8 @@ class Ros2DataModelUtil(DataModelUtil): or `None` if topic name not found """ # We could have more than one publisher for the topic - publisher_handles = self.data.publishers.loc[ - self.data.publishers['topic_name'] == topic_name + publisher_handles = self.data.rcl_publishers.loc[ + self.data.rcl_publishers['topic_name'] == topic_name ].index.values.astype(int) if len(publisher_handles) == 0: return None @@ -155,7 +155,7 @@ class Ros2DataModelUtil(DataModelUtil): The rows are ordered by publish timestamp, so the order will usually be: rclcpp, rcl, rmw. However, this does not apply to publications from internal publishers, i.e., publications that originate from below rclcpp (rcl or rmw). - TODO(christophebedard) find heuristic to exclude those + TODO(christophebedard) find heuristic to exclude those? :return: dataframe with [timestamp, message, layer 'rclcpp'|'rcl'|'rmw', publisher handle] columns, ordered by timestamp, @@ -177,6 +177,44 @@ class Ros2DataModelUtil(DataModelUtil): self.convert_time_columns(publish_instances, [], ['timestamp'], True) return publish_instances + def get_take_instances(self) -> DataFrame: + """ + Get all take instances (rmw, rcl, rclcpp) in a single dataframe. + + The rows are ordered by take timestamp, so the order will usually be: rmw, rcl, rclcpp. + However, thsi does not apply to takes from internal subscriptions, i.e., + takes that originate from below rclcpp (rcl or rmw). + TODO(christophebedard) find heuristic to exclude those? + + :return: dataframe with + [timestamp, message, source timestamp, + layer 'rmw'|'rcl'|'rmw', rmw subscription handle, taken] + columns, ordered by timestamp, + and where the rmw subscription handle, source timestamp, and taken flag are only set + (non-zero, non-False) for 'rmw' take instances + """ + rmw_instances = self.data.rmw_take_instances.copy() + rmw_instances['layer'] = 'rmw' + rmw_instances.rename( + columns={'subscription_handle': 'rmw_subscription_handle'}, + inplace=True, + ) + rcl_instances = self.data.rcl_take_instances.copy() + rcl_instances['layer'] = 'rcl' + rcl_instances['rmw_subscription_handle'] = 0 + rcl_instances['source_timestamp'] = 0 + rcl_instances['taken'] = False + rclcpp_instances = self.data.rclcpp_take_instances.copy() + rclcpp_instances['layer'] = 'rclcpp' + rclcpp_instances['rmw_subscription_handle'] = 0 + rclcpp_instances['source_timestamp'] = 0 + rclcpp_instances['taken'] = False + take_instances = concat([rmw_instances, rcl_instances, rclcpp_instances], axis=0) + take_instances.sort_values('timestamp', inplace=True) + take_instances.reset_index(drop=True, inplace=True) + self.convert_time_columns(take_instances, [], ['timestamp', 'source_timestamp'], True) + return take_instances + def get_callback_durations( self, callback_obj: int, @@ -250,7 +288,7 @@ class Ros2DataModelUtil(DataModelUtil): if reference in self.data.timers.index: type_name = 'Timer' info = self.get_timer_handle_info(reference) - elif reference in self.data.publishers.index: + elif reference in self.data.rcl_publishers.index: type_name = 'Publisher' info = self.get_publisher_handle_info(reference) elif reference in self.data.subscription_objects.index: @@ -300,14 +338,14 @@ class Ros2DataModelUtil(DataModelUtil): :param publisher_handle: the publisher handle value :return: a dictionary with name:value info, or `None` if it fails """ - if publisher_handle not in self.data.publishers.index: + if publisher_handle not in self.data.rcl_publishers.index: return None - node_handle = self.data.publishers.loc[publisher_handle, 'node_handle'] + node_handle = self.data.rcl_publishers.loc[publisher_handle, 'node_handle'] node_handle_info = self.get_node_handle_info(node_handle) if node_handle_info is None: return None - topic_name = self.data.publishers.loc[publisher_handle, 'topic_name'] + topic_name = self.data.rcl_publishers.loc[publisher_handle, 'topic_name'] publisher_info = {'topic': topic_name} return {**node_handle_info, **publisher_info} @@ -338,7 +376,7 @@ class Ros2DataModelUtil(DataModelUtil): columns=['timestamp'], axis=1, ) - subscriptions_simple = self.data.subscriptions.drop( + subscriptions_simple = self.data.rcl_subscriptions.drop( columns=['timestamp', 'rmw_handle'], inplace=False, )