Support publishing instrumentation

Signed-off-by: Christophe Bedard <bedard.christophe@gmail.com>
This commit is contained in:
Christophe Bedard 2021-06-22 19:31:21 -04:00
parent 9c535d1261
commit 7f1a3eec80
3 changed files with 118 additions and 0 deletions

View file

@ -46,6 +46,9 @@ class Ros2DataModel(DataModel):
self._callback_symbols: DataModelIntermediateStorage = []
self._lifecycle_state_machines: DataModelIntermediateStorage = []
# Events (multiple instances, may not have a meaningful index)
self._rclcpp_publish_instances: DataModelIntermediateStorage = []
self._rcl_publish_instances: DataModelIntermediateStorage = []
self._rmw_publish_instances: DataModelIntermediateStorage = []
self._callback_instances: DataModelIntermediateStorage = []
self._lifecycle_transitions: DataModelIntermediateStorage = []
@ -83,6 +86,31 @@ class Ros2DataModel(DataModel):
'depth': depth,
})
def add_rclcpp_publish_instance(
self, timestamp, message,
) -> None:
self._rclcpp_publish_instances.append({
'timestamp': timestamp,
'message': message,
})
def add_rcl_publish_instance(
self, publisher_handle, timestamp, message,
) -> None:
self._rcl_publish_instances.append({
'publisher_handle': publisher_handle,
'timestamp': timestamp,
'message': message,
})
def add_rmw_publish_instance(
self, timestamp, message,
) -> None:
self._rmw_publish_instances.append({
'timestamp': timestamp,
'message': message,
})
def add_rcl_subscription(
self, handle, timestamp, node_handle, rmw_handle, topic_name, depth
) -> None:
@ -231,6 +259,9 @@ class Ros2DataModel(DataModel):
if self._lifecycle_state_machines:
self.lifecycle_state_machines.set_index(
'state_machine_handle', inplace=True, drop=True)
self.rclcpp_publish_instances = pd.DataFrame.from_dict(self._rclcpp_publish_instances)
self.rcl_publish_instances = pd.DataFrame.from_dict(self._rcl_publish_instances)
self.rmw_publish_instances = pd.DataFrame.from_dict(self._rmw_publish_instances)
self.callback_instances = pd.DataFrame.from_dict(self._callback_instances)
self.lifecycle_transitions = pd.DataFrame.from_dict(self._lifecycle_transitions)
@ -245,6 +276,15 @@ class Ros2DataModel(DataModel):
print('Publishers:')
print(self.publishers.to_string())
print()
print('Publish instances (rclcpp):')
print(self.rclcpp_publish_instances.to_string())
print()
print('Publish instances (rcl):')
print(self.rcl_publish_instances.to_string())
print()
print('Publish instances (rmw):')
print(self.rmw_publish_instances.to_string())
print()
print('Subscriptions:')
print(self.subscriptions.to_string())
print()

View file

@ -47,6 +47,12 @@ class Ros2Handler(EventHandler):
self._handle_rcl_node_init,
'ros2:rcl_publisher_init':
self._handle_rcl_publisher_init,
'ros2:rclcpp_publish':
self._handle_rclcpp_publish,
'ros2:rcl_publish':
self._handle_rcl_publish,
'ros2:rmw_publish':
self._handle_rmw_publish,
'ros2:rcl_subscription_init':
self._handle_rcl_subscription_init,
'ros2:rclcpp_subscription_init':
@ -126,6 +132,28 @@ class Ros2Handler(EventHandler):
depth = get_field(event, 'queue_depth')
self.data.add_publisher(handle, timestamp, node_handle, rmw_handle, topic_name, depth)
def _handle_rclcpp_publish(
self, event: Dict, metadata: EventMetadata,
) -> None:
timestamp = metadata.timestamp
message = get_field(event, 'message')
self.data.add_rclcpp_publish_instance(timestamp, message)
def _handle_rcl_publish(
self, event: Dict, metadata: EventMetadata,
) -> None:
handle = get_field(event, 'publisher_handle')
timestamp = metadata.timestamp
message = get_field(event, 'message')
self.data.add_rcl_publish_instance(handle, timestamp, message)
def _handle_rmw_publish(
self, event: Dict, metadata: EventMetadata,
) -> None:
timestamp = metadata.timestamp
message = get_field(event, 'message')
self.data.add_rmw_publish_instance(timestamp, message)
def _handle_rcl_subscription_init(
self, event: Dict, metadata: EventMetadata,
) -> None:

View file

@ -127,6 +127,56 @@ class Ros2DataModelUtil(DataModelUtil):
"""Get a list of thread ids corresponding to the nodes."""
return self.data.nodes['tid'].unique().tolist()
def get_rcl_publish_instances(self, topic_name) -> Optional[DataFrame]:
"""
Get rcl publish instances for all publishers with the given topic name.
:param topic_name: the topic name
:return: dataframe with [publisher handle, publish timestamp, message] columns,
or `None` if topic name not found
"""
# We could have more than one publisher for the topic
publisher_handles = self.data.publishers.loc[
self.data.publishers['topic_name'] == topic_name
].index.values.astype(int)
if len(publisher_handles) == 0:
return None
publish_instances = self.data.rcl_publish_instances.loc[
self.data.rcl_publish_instances['publisher_handle'].isin(publisher_handles)
]
publish_instances.reset_index(drop=True, inplace=True)
self.convert_time_columns(publish_instances, [], ['timestamp'], True)
return publish_instances
def get_publish_instances(self) -> DataFrame:
"""
Get all publish instances (rclcpp, rcl, rmw) in a single dataframe.
The rows are ordered by publish timestamp, so the order will usually be: rclcpp, rcl, rmw.
However, this does not apply to publications from internal publishers, i.e.,
publications that originate from below rclcpp (rcl or rmw).
TODO(christophebedard) find heuristic to exclude those
:return: dataframe with [timestamp, message, layer 'rclcpp'|'rcl'|'rmw', publisher handle]
columns, ordered by timestamp,
and where the publisher handle is only set (non-zero) for 'rcl' publish instances
"""
# Add publisher handle columns with zeros for dataframes that do not have this column,
# otherwise NaN is used and the publisher handle values for rcl are converted to float
rclcpp_instances = self.data.rclcpp_publish_instances.copy()
rclcpp_instances['layer'] = 'rclcpp'
rclcpp_instances['publisher_handle'] = 0
rcl_instances = self.data.rcl_publish_instances.copy()
rcl_instances['layer'] = 'rcl'
rmw_instances = self.data.rmw_publish_instances.copy()
rmw_instances['layer'] = 'rmw'
rmw_instances['publisher_handle'] = 0
publish_instances = concat([rclcpp_instances, rcl_instances, rmw_instances], axis=0)
publish_instances.sort_values('timestamp', inplace=True)
publish_instances.reset_index(drop=True, inplace=True)
self.convert_time_columns(publish_instances, [], ['timestamp'], True)
return publish_instances
def get_callback_durations(
self,
callback_obj: int,