Add util methods to get information on a callback object

This commit is contained in:
Christophe Bedard 2019-06-27 14:55:54 +02:00
parent 579e064b54
commit a816736c43
3 changed files with 156 additions and 5 deletions

View file

@ -68,7 +68,8 @@ class DataModel():
self.clients.set_index(['client_handle'], inplace=True, drop=True)
self.timers = pd.DataFrame(columns=['timer_handle',
'timestamp',
'period'])
'period',
'tid'])
self.timers.set_index(['timer_handle'], inplace=True, drop=True)
self.callback_objects = pd.DataFrame(columns=['handle',
@ -117,9 +118,9 @@ class DataModel():
self.clients.loc[handle] = [timestamp, node_handle, rmw_handle, service_name]
def add_timer(
self, handle, timestamp, period
self, handle, timestamp, period, tid
) -> None:
self.timers.loc[handle] = [timestamp, period]
self.timers.loc[handle] = [timestamp, period, tid]
def add_callback_object(
self, handle, timestamp, callback_object
@ -165,3 +166,9 @@ class DataModel():
print()
print(f'Callback instances:\n{self.callback_instances.to_string()}')
print('==================================================')
timers_info = self.timers.merge(self.nodes, on='tid', right_index=True)
print(timers_info.to_string())
print()
subscriptions_info = self.subscriptions.merge(self.nodes, left_on='node_handle', right_index=True)
print(subscriptions_info.to_string())

View file

@ -155,7 +155,8 @@ class Ros2Processor(EventHandler):
handle = get_field(event, 'timer_handle')
timestamp = metadata.timestamp
period = get_field(event, 'period')
self._data.add_timer(handle, timestamp, period)
tid = metadata.tid
self._data.add_timer(handle, timestamp, period, tid)
def _handle_rclcpp_timer_callback_added(
self, event: Dict, metadata: EventMetadata

View file

@ -14,7 +14,9 @@
"""Module for data model utility class."""
from typing import Any
from typing import Mapping
from typing import Union
from pandas import DataFrame
@ -54,10 +56,151 @@ class DataModelUtil():
"""
Get durations of callback instances for a given callback object.
:param callback_obj: a callback object value
:param callback_obj: the callback object value
:return: a dataframe containing the durations of all callback instances for that object
"""
return self._data.callback_instances.loc[
self._data.callback_instances.loc[:, 'callback_object'] == callback_obj,
:
]
def get_callback_owner_info(self, callback_obj: int) -> Union[str, None]:
"""
Get information about the owner of a callback.
Depending on the type of callback, it will give different kinds of info:
* subscription: node name, topic name
* timer: tid, period of timer
* service/client: node name, service name
:param callback_obj: the callback object value
:return: information about the owner of the callback, or `None` if it fails
"""
# Get handle corresponding to callback object
handle = self._data.callback_objects.loc[
self._data.callback_objects['callback_object'] == callback_obj
].index.values.astype(int)[0]
type_name = None
info = None
# Check if it's a timer first (since it's slightly different than the others)
if handle in self._data.timers.index:
type_name = 'Timer'
info = self.get_timer_handle_info(handle)
elif handle in self._data.publishers.index:
type_name = 'Publisher'
info = self.get_publisher_handle_info(handle)
elif handle in self._data.subscriptions.index:
type_name = 'Subscription'
info = self.get_subscription_handle_info(handle)
elif handle in self._data.services.index:
type_name = 'Service'
info = self.get_subscription_handle_info(handle)
elif handle in self._data.clients.index:
type_name = 'Client'
info = self.get_client_handle_info(handle)
if info is not None:
info = f'{type_name} -- {self.format_info_dict(info)}'
return info
def get_timer_handle_info(self, timer_handle: int) -> Union[Mapping[str, Any], None]:
"""
Get information about the owner of a timer.
:param timer_handle: the timer handle value
:return: a dictionary with name:value info, or `None` if it fails
"""
timers_info = self._data.timers.merge(self._data.nodes, on='tid', right_index=True)
if timer_handle not in timers_info.index:
return None
tid = timers_info.loc[timer_handle, 'tid']
period_ns = timers_info.loc[timer_handle, 'period']
period_ms = period_ns / 1000000.0
return {'tid': tid, 'period': f'{period_ms} ms'}
def get_publisher_handle_info(self, publisher_handle: int) -> Union[Mapping[str, Any], None]:
"""
Get information about a publisher handle.
:param publisher_handle: the publisher handle value
:return: a dictionary with name:value info, or `None` if it fails
"""
if publisher_handle not in self._data.publishers.index:
return None
node_handle = self._data.publishers.loc[publisher_handle, 'node_handle']
node_handle_info = self.get_node_handle_info(node_handle)
topic_name = self._data.publishers.loc[publisher_handle, 'topic_name']
publisher_info = {'topic': topic_name}
return {**node_handle_info, **publisher_info}
def get_subscription_handle_info(self, subscription_handle: int) -> Union[Mapping[str, Any], None]:
"""
Get information about a subscription handle.
:param subscription_handle: the subscription handle value
:return: a dictionary with name:value info, or `None` if it fails
"""
subscriptions_info = self._data.subscriptions.merge(
self._data.nodes,
left_on='node_handle',
right_index=True)
if subscription_handle not in self._data.subscriptions.index:
return None
node_handle = subscriptions_info.loc[subscription_handle, 'node_handle']
node_handle_info = self.get_node_handle_info(node_handle)
topic_name = subscriptions_info.loc[subscription_handle, 'topic_name']
subscription_info = {'topic': topic_name}
return {**node_handle_info, **subscription_info}
def get_service_handle_info(self, service_handle: int) -> Union[Mapping[str, Any], None]:
"""
Get information about a service handle.
:param service_handle: the service handle value
:return: a dictionary with name:value info, or `None` if it fails
"""
if service_handle not in self._data.services:
return None
node_handle = self._data.services.loc[service_handle, 'node_handle']
node_handle_info = self.get_node_handle_info(node_handle)
service_name = self._data.services.loc[service_handle, 'service_name']
service_info = {'service': service_name}
return {**node_handle_info, **service_info}
def get_client_handle_info(self, client_handle: int) -> Union[Mapping[str, Any], None]:
"""
Get information about a client handle.
:param client_handle: the client handle value
:return: a dictionary with name:value info, or `None` if it fails
"""
if client_handle not in self._data.clients:
return None
node_handle = self._data.clients.loc[client_handle, 'node_handle']
node_handle_info = self.get_node_handle_info(node_handle)
service_name = self._data.clients.loc[client_handle, 'service_name']
service_info = {'service': service_name}
return {**node_handle_info, **service_info}
def get_node_handle_info(self, node_handle: int) -> Union[Mapping[str, Any], None]:
"""
Get information about a node handle.
:param node_handle: the node handle value
:return: a dictionary with name:value info, or `None` if it fails
"""
if node_handle not in self._data.nodes.index:
return None
node_name = self._data.nodes.loc[node_handle, 'name']
tid = self._data.nodes.loc[node_handle, 'tid']
return {'node': node_name, 'tid': tid}
def format_info_dict(self, info_dict: Mapping[str, Any]) -> str:
return ', '.join([f'{key}: {val}' for key, val in info_dict.items()])