Support rmw pub/sub init and take instrumentation

Signed-off-by: Christophe Bedard <bedard.christophe@gmail.com>
This commit is contained in:
Christophe Bedard 2021-10-01 13:37:49 -04:00
parent 132c159764
commit bffbe1b10e
3 changed files with 195 additions and 31 deletions

View file

@ -35,8 +35,10 @@ class Ros2DataModel(DataModel):
# Objects (one-time events, usually when something is created)
self._contexts: DataModelIntermediateStorage = []
self._nodes: DataModelIntermediateStorage = []
self._publishers: DataModelIntermediateStorage = []
self._subscriptions: DataModelIntermediateStorage = []
self._rmw_publishers: DataModelIntermediateStorage = []
self._rcl_publishers: DataModelIntermediateStorage = []
self._rmw_subscriptions: DataModelIntermediateStorage = []
self._rcl_subscriptions: DataModelIntermediateStorage = []
self._subscription_objects: DataModelIntermediateStorage = []
self._services: DataModelIntermediateStorage = []
self._clients: DataModelIntermediateStorage = []
@ -49,6 +51,9 @@ class Ros2DataModel(DataModel):
self._rclcpp_publish_instances: DataModelIntermediateStorage = []
self._rcl_publish_instances: DataModelIntermediateStorage = []
self._rmw_publish_instances: DataModelIntermediateStorage = []
self._rmw_take_instances: DataModelIntermediateStorage = []
self._rcl_take_instances: DataModelIntermediateStorage = []
self._rclcpp_take_instances: DataModelIntermediateStorage = []
self._callback_instances: DataModelIntermediateStorage = []
self._lifecycle_transitions: DataModelIntermediateStorage = []
@ -74,10 +79,19 @@ class Ros2DataModel(DataModel):
'namespace': namespace,
})
def add_publisher(
def add_rmw_publisher(
self, handle, timestamp, gid,
) -> None:
self._rmw_publishers.append({
'publisher_handle': handle,
'timestamp': timestamp,
'gid': gid,
})
def add_rcl_publisher(
self, handle, timestamp, node_handle, rmw_handle, topic_name, depth
) -> None:
self._publishers.append({
self._rcl_publishers.append({
'publisher_handle': handle,
'timestamp': timestamp,
'node_handle': node_handle,
@ -111,10 +125,19 @@ class Ros2DataModel(DataModel):
'message': message,
})
def add_rmw_subscription(
self, handle, timestamp, gid
) -> None:
self._rcl_subscriptions.append({
'subscription_handle': handle,
'timestamp': timestamp,
'gid': gid,
})
def add_rcl_subscription(
self, handle, timestamp, node_handle, rmw_handle, topic_name, depth
) -> None:
self._subscriptions.append({
self._rcl_subscriptions.append({
'subscription_handle': handle,
'timestamp': timestamp,
'node_handle': node_handle,
@ -201,6 +224,33 @@ class Ros2DataModel(DataModel):
'intra_process': intra_process,
})
def add_rmw_take_instance(
self, subscription_handle, timestamp, message, source_timestamp, taken
) -> None:
self._rmw_take_instances.append({
'subscription_handle': subscription_handle,
'timestamp': timestamp,
'message': message,
'source_timestamp': source_timestamp,
'taken': taken,
})
def add_rcl_take_instance(
self, timestamp, message
) -> None:
self._rcl_take_instances.append({
'timestamp': timestamp,
'message': message,
})
def add_rclcpp_take_instance(
self, timestamp, message
) -> None:
self._rclcpp_take_instances.append({
'timestamp': timestamp,
'message': message,
})
def add_lifecycle_state_machine(
self, handle, node_handle
) -> None:
@ -228,12 +278,18 @@ class Ros2DataModel(DataModel):
self.nodes = pd.DataFrame.from_dict(self._nodes)
if self._nodes:
self.nodes.set_index('node_handle', inplace=True, drop=True)
self.publishers = pd.DataFrame.from_dict(self._publishers)
if self._publishers:
self.publishers.set_index('publisher_handle', inplace=True, drop=True)
self.subscriptions = pd.DataFrame.from_dict(self._subscriptions)
if self._subscriptions:
self.subscriptions.set_index('subscription_handle', inplace=True, drop=True)
self.rmw_publishers = pd.DataFrame.from_dict(self._rmw_publishers)
if self._rmw_publishers:
self.rmw_publishers.set_index('publisher_handle', inplace=True, drop=True)
self.rcl_publishers = pd.DataFrame.from_dict(self._rcl_publishers)
if self._rcl_publishers:
self.rcl_publishers.set_index('publisher_handle', inplace=True, drop=True)
self.rmw_subscriptions = pd.DataFrame.from_dict(self._rmw_subscriptions)
if self._rmw_subscriptions:
self.rmw_subscriptions.set_index('subscription_handle', inplace=True, drop=True)
self.rcl_subscriptions = pd.DataFrame.from_dict(self._rcl_subscriptions)
if self._rcl_subscriptions:
self.rcl_subscriptions.set_index('subscription_handle', inplace=True, drop=True)
self.subscription_objects = pd.DataFrame.from_dict(self._subscription_objects)
if self._subscription_objects:
self.subscription_objects.set_index('subscription', inplace=True, drop=True)
@ -262,6 +318,9 @@ class Ros2DataModel(DataModel):
self.rclcpp_publish_instances = pd.DataFrame.from_dict(self._rclcpp_publish_instances)
self.rcl_publish_instances = pd.DataFrame.from_dict(self._rcl_publish_instances)
self.rmw_publish_instances = pd.DataFrame.from_dict(self._rmw_publish_instances)
self.rmw_take_instances = pd.DataFrame.from_dict(self._rmw_take_instances)
self.rcl_take_instances = pd.DataFrame.from_dict(self._rcl_take_instances)
self.rclcpp_take_instances = pd.DataFrame.from_dict(self._rclcpp_take_instances)
self.callback_instances = pd.DataFrame.from_dict(self._callback_instances)
self.lifecycle_transitions = pd.DataFrame.from_dict(self._lifecycle_transitions)
@ -273,20 +332,17 @@ class Ros2DataModel(DataModel):
print('Nodes:')
print(self.nodes.to_string())
print()
print('Publishers:')
print(self.publishers.to_string())
print('Publishers (rmw):')
print(self.rmw_publishers.to_string())
print()
print('Publish instances (rclcpp):')
print(self.rclcpp_publish_instances.to_string())
print('Publishers (rcl):')
print(self.rcl_publishers.to_string())
print()
print('Publish instances (rcl):')
print(self.rcl_publish_instances.to_string())
print('Subscriptions (rmw):')
print(self.rmw_subscriptions.to_string())
print()
print('Publish instances (rmw):')
print(self.rmw_publish_instances.to_string())
print()
print('Subscriptions:')
print(self.subscriptions.to_string())
print('Subscriptions (rcl):')
print(self.rcl_subscriptions.to_string())
print()
print('Subscription objects:')
print(self.subscription_objects.to_string())
@ -312,6 +368,24 @@ class Ros2DataModel(DataModel):
print('Callback instances:')
print(self.callback_instances.to_string())
print()
print('Publish instances (rclcpp):')
print(self.rclcpp_publish_instances.to_string())
print()
print('Publish instances (rcl):')
print(self.rcl_publish_instances.to_string())
print()
print('Publish instances (rmw):')
print(self.rmw_publish_instances.to_string())
print()
print('Take instances (rmw):')
print(self.rmw_take_instances.to_string())
print()
print('Take instances (rcl):')
print(self.rcl_take_instances.to_string())
print()
print('Take instances (rclcpp):')
print(self.rclcpp_take_instances.to_string())
print()
print('Lifecycle state machines:')
print(self.lifecycle_state_machines.to_string())
print()

View file

@ -45,6 +45,8 @@ class Ros2Handler(EventHandler):
self._handle_rcl_init,
'ros2:rcl_node_init':
self._handle_rcl_node_init,
'ros2:rmw_publisher_init':
self._handle_rmw_publisher_init,
'ros2:rcl_publisher_init':
self._handle_rcl_publisher_init,
'ros2:rclcpp_publish':
@ -53,12 +55,20 @@ class Ros2Handler(EventHandler):
self._handle_rcl_publish,
'ros2:rmw_publish':
self._handle_rmw_publish,
'ros2:rmw_subscription_init':
self._handle_rmw_subscription_init,
'ros2:rcl_subscription_init':
self._handle_rcl_subscription_init,
'ros2:rclcpp_subscription_init':
self._handle_rclcpp_subscription_init,
'ros2:rclcpp_subscription_callback_added':
self._handle_rclcpp_subscription_callback_added,
'ros2:rmw_take':
self._handle_rmw_take,
'ros2:rcl_take':
self._handle_rcl_take,
'ros2:rclcpp_take':
self._handle_rclcpp_take,
'ros2:rcl_service_init':
self._handle_rcl_service_init,
'ros2:rclcpp_service_callback_added':
@ -121,6 +131,14 @@ class Ros2Handler(EventHandler):
namespace = get_field(event, 'namespace')
self.data.add_node(handle, timestamp, tid, rmw_handle, name, namespace)
def _handle_rmw_publisher_init(
self, event: Dict, metadata: EventMetadata,
) -> None:
handle = get_field(event, 'rmw_publisher_handle')
timestamp = metadata.timestamp
gid = get_field(event, 'gid')
self.data.add_rmw_publisher(handle, timestamp, gid)
def _handle_rcl_publisher_init(
self, event: Dict, metadata: EventMetadata,
) -> None:
@ -130,7 +148,7 @@ class Ros2Handler(EventHandler):
rmw_handle = get_field(event, 'rmw_publisher_handle')
topic_name = get_field(event, 'topic_name')
depth = get_field(event, 'queue_depth')
self.data.add_publisher(handle, timestamp, node_handle, rmw_handle, topic_name, depth)
self.data.add_rcl_publisher(handle, timestamp, node_handle, rmw_handle, topic_name, depth)
def _handle_rclcpp_publish(
self, event: Dict, metadata: EventMetadata,
@ -154,6 +172,14 @@ class Ros2Handler(EventHandler):
message = get_field(event, 'message')
self.data.add_rmw_publish_instance(timestamp, message)
def _handle_rmw_subscription_init(
self, event: Dict, metadata: EventMetadata,
) -> None:
handle = get_field(event, 'rmw_subscription_handle')
timestamp = metadata.timestamp
gid = get_field(event, 'gid')
self.data.add_rmw_subscription(handle, timestamp, gid)
def _handle_rcl_subscription_init(
self, event: Dict, metadata: EventMetadata,
) -> None:
@ -183,6 +209,32 @@ class Ros2Handler(EventHandler):
callback_object = get_field(event, 'callback')
self.data.add_callback_object(subscription_pointer, timestamp, callback_object)
def _handle_rmw_take(
self, event: Dict, metadata: EventMetadata,
) -> None:
subscription_handle = get_field(event, 'rmw_subscription_handle')
timestamp = metadata.timestamp
message = get_field(event, 'message')
source_timestamp = get_field(event, 'source_timestamp')
taken = bool(get_field(event, 'taken'))
self.data.add_rmw_take_instance(
subscription_handle, timestamp, message, source_timestamp, taken
)
def _handle_rcl_take(
self, event: Dict, metadata: EventMetadata,
) -> None:
timestamp = metadata.timestamp
message = get_field(event, 'message')
self.data.add_rcl_take_instance(timestamp, message)
def _handle_rclcpp_take(
self, event: Dict, metadata: EventMetadata,
) -> None:
timestamp = metadata.timestamp
message = get_field(event, 'message')
self.data.add_rclcpp_take_instance(timestamp, message)
def _handle_rcl_service_init(
self, event: Dict, metadata: EventMetadata,
) -> None:

View file

@ -136,8 +136,8 @@ class Ros2DataModelUtil(DataModelUtil):
or `None` if topic name not found
"""
# We could have more than one publisher for the topic
publisher_handles = self.data.publishers.loc[
self.data.publishers['topic_name'] == topic_name
publisher_handles = self.data.rcl_publishers.loc[
self.data.rcl_publishers['topic_name'] == topic_name
].index.values.astype(int)
if len(publisher_handles) == 0:
return None
@ -155,7 +155,7 @@ class Ros2DataModelUtil(DataModelUtil):
The rows are ordered by publish timestamp, so the order will usually be: rclcpp, rcl, rmw.
However, this does not apply to publications from internal publishers, i.e.,
publications that originate from below rclcpp (rcl or rmw).
TODO(christophebedard) find heuristic to exclude those
TODO(christophebedard) find heuristic to exclude those?
:return: dataframe with [timestamp, message, layer 'rclcpp'|'rcl'|'rmw', publisher handle]
columns, ordered by timestamp,
@ -177,6 +177,44 @@ class Ros2DataModelUtil(DataModelUtil):
self.convert_time_columns(publish_instances, [], ['timestamp'], True)
return publish_instances
def get_take_instances(self) -> DataFrame:
"""
Get all take instances (rmw, rcl, rclcpp) in a single dataframe.
The rows are ordered by take timestamp, so the order will usually be: rmw, rcl, rclcpp.
However, thsi does not apply to takes from internal subscriptions, i.e.,
takes that originate from below rclcpp (rcl or rmw).
TODO(christophebedard) find heuristic to exclude those?
:return: dataframe with
[timestamp, message, source timestamp,
layer 'rmw'|'rcl'|'rmw', rmw subscription handle, taken]
columns, ordered by timestamp,
and where the rmw subscription handle, source timestamp, and taken flag are only set
(non-zero, non-False) for 'rmw' take instances
"""
rmw_instances = self.data.rmw_take_instances.copy()
rmw_instances['layer'] = 'rmw'
rmw_instances.rename(
columns={'subscription_handle': 'rmw_subscription_handle'},
inplace=True,
)
rcl_instances = self.data.rcl_take_instances.copy()
rcl_instances['layer'] = 'rcl'
rcl_instances['rmw_subscription_handle'] = 0
rcl_instances['source_timestamp'] = 0
rcl_instances['taken'] = False
rclcpp_instances = self.data.rclcpp_take_instances.copy()
rclcpp_instances['layer'] = 'rclcpp'
rclcpp_instances['rmw_subscription_handle'] = 0
rclcpp_instances['source_timestamp'] = 0
rclcpp_instances['taken'] = False
take_instances = concat([rmw_instances, rcl_instances, rclcpp_instances], axis=0)
take_instances.sort_values('timestamp', inplace=True)
take_instances.reset_index(drop=True, inplace=True)
self.convert_time_columns(take_instances, [], ['timestamp', 'source_timestamp'], True)
return take_instances
def get_callback_durations(
self,
callback_obj: int,
@ -250,7 +288,7 @@ class Ros2DataModelUtil(DataModelUtil):
if reference in self.data.timers.index:
type_name = 'Timer'
info = self.get_timer_handle_info(reference)
elif reference in self.data.publishers.index:
elif reference in self.data.rcl_publishers.index:
type_name = 'Publisher'
info = self.get_publisher_handle_info(reference)
elif reference in self.data.subscription_objects.index:
@ -300,14 +338,14 @@ class Ros2DataModelUtil(DataModelUtil):
:param publisher_handle: the publisher handle value
:return: a dictionary with name:value info, or `None` if it fails
"""
if publisher_handle not in self.data.publishers.index:
if publisher_handle not in self.data.rcl_publishers.index:
return None
node_handle = self.data.publishers.loc[publisher_handle, 'node_handle']
node_handle = self.data.rcl_publishers.loc[publisher_handle, 'node_handle']
node_handle_info = self.get_node_handle_info(node_handle)
if node_handle_info is None:
return None
topic_name = self.data.publishers.loc[publisher_handle, 'topic_name']
topic_name = self.data.rcl_publishers.loc[publisher_handle, 'topic_name']
publisher_info = {'topic': topic_name}
return {**node_handle_info, **publisher_info}
@ -338,7 +376,7 @@ class Ros2DataModelUtil(DataModelUtil):
columns=['timestamp'],
axis=1,
)
subscriptions_simple = self.data.subscriptions.drop(
subscriptions_simple = self.data.rcl_subscriptions.drop(
columns=['timestamp', 'rmw_handle'],
inplace=False,
)