Implement handling for sub callbacks
This commit is contained in:
parent
c88000bede
commit
80a62cd591
2 changed files with 47 additions and 22 deletions
|
@ -22,17 +22,20 @@ class DataModel():
|
||||||
self._publishers.set_index(['publisher_handle'], inplace=True, drop=True)
|
self._publishers.set_index(['publisher_handle'], inplace=True, drop=True)
|
||||||
self._subscriptions = pd.DataFrame(columns=['subscription_handle', 'timestamp', 'node_handle', 'rmw_handle', 'topic_name', 'depth'])
|
self._subscriptions = pd.DataFrame(columns=['subscription_handle', 'timestamp', 'node_handle', 'rmw_handle', 'topic_name', 'depth'])
|
||||||
self._subscriptions.set_index(['subscription_handle'], inplace=True, drop=True)
|
self._subscriptions.set_index(['subscription_handle'], inplace=True, drop=True)
|
||||||
|
self._subscription_callback_objects = pd.DataFrame(columns=['subscription_handle', 'timestamp', 'callback_object'])
|
||||||
|
self._subscription_callback_objects.set_index(['subscription_handle'], inplace=True, drop=True)
|
||||||
|
self._callbacks = pd.DataFrame(columns=['callback_object', 'timestamp', 'symbol'])
|
||||||
|
self._callbacks.set_index(['callback_object'], inplace=True, drop=True)
|
||||||
|
|
||||||
self._services = pd.DataFrame(columns=[])
|
self._services = pd.DataFrame(columns=[])
|
||||||
self._clients = pd.DataFrame(columns=[])
|
self._clients = pd.DataFrame(columns=[])
|
||||||
self._timers = pd.DataFrame(columns=[])
|
self._timers = pd.DataFrame(columns=[])
|
||||||
|
|
||||||
# Events
|
# Events (multiple instances, may not have a meaningful index)
|
||||||
# TODO
|
self._subscription_callbacks = pd.DataFrame(columns=['callback_object', 'timestamp', 'duration', 'intra_process'])
|
||||||
|
|
||||||
def add_context(self, context_handle, timestamp, pid):
|
def add_context(self, context_handle, timestamp, pid):
|
||||||
self._contexts.loc[context_handle] = [timestamp, pid]
|
self._contexts.loc[context_handle] = [timestamp, pid]
|
||||||
# self._contexts = self._contexts.append({'context_handle': context_handle, 'timestamp': timestamp, 'pid': pid}, ignore_index=True)
|
|
||||||
|
|
||||||
def add_node(self, node_handle, timestamp, tid, rmw_handle, name, namespace):
|
def add_node(self, node_handle, timestamp, tid, rmw_handle, name, namespace):
|
||||||
self._nodes.loc[node_handle] = [timestamp, tid, rmw_handle, name, namespace]
|
self._nodes.loc[node_handle] = [timestamp, tid, rmw_handle, name, namespace]
|
||||||
|
@ -43,6 +46,15 @@ class DataModel():
|
||||||
def add_subscription(self, subscription_handle, timestamp, node_handle, rmw_handle, topic_name, depth):
|
def add_subscription(self, subscription_handle, timestamp, node_handle, rmw_handle, topic_name, depth):
|
||||||
self._subscriptions.loc[subscription_handle] = [timestamp, node_handle, rmw_handle, topic_name, depth]
|
self._subscriptions.loc[subscription_handle] = [timestamp, node_handle, rmw_handle, topic_name, depth]
|
||||||
|
|
||||||
|
def add_subscription_callback_object(self, subscription_handle, timestamp, callback_object):
|
||||||
|
self._subscription_callback_objects.loc[subscription_handle] = [timestamp, callback_object]
|
||||||
|
|
||||||
|
def add_callback(self, callback_object, timestamp, symbol):
|
||||||
|
self._callbacks.loc[callback_object] = [timestamp, symbol]
|
||||||
|
|
||||||
|
def add_subscription_callback_instance(self, callback_object, timestamp, duration, intra_process):
|
||||||
|
self._subscription_callbacks = self._subscription_callbacks.append({'callback_object': callback_object, 'timestamp': timestamp, 'duration': duration, 'intra_process': intra_process}, ignore_index=True)
|
||||||
|
|
||||||
def print(self):
|
def print(self):
|
||||||
"""Debug method to print every contained df."""
|
"""Debug method to print every contained df."""
|
||||||
print('====================DATA MODEL====================')
|
print('====================DATA MODEL====================')
|
||||||
|
@ -52,11 +64,17 @@ class DataModel():
|
||||||
print()
|
print()
|
||||||
print(f'Publishers:\n{self._publishers.to_string()}')
|
print(f'Publishers:\n{self._publishers.to_string()}')
|
||||||
print()
|
print()
|
||||||
print(f'Subscription:\n{self._subscriptions.to_string()}')
|
print(f'Subscriptions:\n{self._subscriptions.to_string()}')
|
||||||
|
print()
|
||||||
|
print(f'Subscription callbacks:\n{self._subscription_callback_objects.to_string()}')
|
||||||
|
print()
|
||||||
|
print(f'Subscription callback instances:\n{self._subscription_callbacks.to_string()}')
|
||||||
print()
|
print()
|
||||||
print(f'Services:\n{self._services.to_string()}')
|
print(f'Services:\n{self._services.to_string()}')
|
||||||
print()
|
print()
|
||||||
print(f'Clients:\n{self._clients.to_string()}')
|
print(f'Clients:\n{self._clients.to_string()}')
|
||||||
print()
|
print()
|
||||||
print(f'Timers:\n{self._timers.to_string()}')
|
print(f'Timers:\n{self._timers.to_string()}')
|
||||||
|
print()
|
||||||
|
print(f'Callbacks:\n{self._callbacks.to_string()}')
|
||||||
print('==================================================')
|
print('==================================================')
|
||||||
|
|
|
@ -65,6 +65,9 @@ class RosProcessor(EventHandler):
|
||||||
super().__init__(handler_map)
|
super().__init__(handler_map)
|
||||||
|
|
||||||
self._data = DataModel()
|
self._data = DataModel()
|
||||||
|
|
||||||
|
# Temporary buffers
|
||||||
|
self._callback_instances = {}
|
||||||
|
|
||||||
def get_data_model(self):
|
def get_data_model(self):
|
||||||
return self._data
|
return self._data
|
||||||
|
@ -97,26 +100,29 @@ class RosProcessor(EventHandler):
|
||||||
self._data.add_subscription(sub_handle, metadata.timestamp, node_handle, rmw_handle, topic_name, depth)
|
self._data.add_subscription(sub_handle, metadata.timestamp, node_handle, rmw_handle, topic_name, depth)
|
||||||
|
|
||||||
def _handle_rclcpp_subscription_callback_added(self, event, metadata):
|
def _handle_rclcpp_subscription_callback_added(self, event, metadata):
|
||||||
# TODO
|
subscription_handle = get_field(event, 'subscription_handle')
|
||||||
pass
|
callback_object = get_field(event, 'callback')
|
||||||
# Add the callback address key and create an empty list
|
self._data.add_subscription_callback_object(subscription_handle, metadata.timestamp, callback_object)
|
||||||
# callback_addr = get_field(event, 'callback')
|
|
||||||
# self.callbacks_instances[callback_addr] = []
|
|
||||||
|
|
||||||
def _handle_rclcpp_subscription_callback_start(self, event, metadata):
|
def _handle_rclcpp_subscription_callback_start(self, event, metadata):
|
||||||
# TODO
|
# Add to dict
|
||||||
pass
|
callback_addr = get_field(event, 'callback')
|
||||||
# callback_addr = get_field(event, 'callback')
|
self._callback_instances[callback_addr] = (event, metadata)
|
||||||
# self._callback_starts[callback_addr] = metadata.timestamp
|
|
||||||
|
|
||||||
def _handle_rclcpp_subscription_callback_end(self, event, metadata):
|
def _handle_rclcpp_subscription_callback_end(self, event, metadata):
|
||||||
# TODO
|
# Fetch from dict
|
||||||
pass
|
callback_object = get_field(event, 'callback')
|
||||||
# callback_addr = get_field(event, 'callback')
|
(event_start, metadata_start) = self._callback_instances.get(callback_object)
|
||||||
# start_timestamp = self._callback_starts.pop(callback_addr, None)
|
if event_start is not None and metadata_start is not None:
|
||||||
# if start_timestamp is not None:
|
del self._callback_instances[callback_object]
|
||||||
# duration = metadata.timestamp - start_timestamp
|
duration = metadata.timestamp - metadata_start.timestamp
|
||||||
# self.callbacks_instances[callback_addr].append((duration, start_timestamp))
|
is_intra_process = get_field(event_start, 'is_intra_process')
|
||||||
|
self._data.add_subscription_callback_instance(callback_object,
|
||||||
|
metadata_start.timestamp,
|
||||||
|
duration,
|
||||||
|
bool(is_intra_process))
|
||||||
|
else:
|
||||||
|
print(f'No matching callback start for callback object "{callback_object}"')
|
||||||
|
|
||||||
def _handle_rcl_service_init(self, event, metadata):
|
def _handle_rcl_service_init(self, event, metadata):
|
||||||
# TODO
|
# TODO
|
||||||
|
@ -155,5 +161,6 @@ class RosProcessor(EventHandler):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _handle_rclcpp_callback_register(self, event, metadata):
|
def _handle_rclcpp_callback_register(self, event, metadata):
|
||||||
# TODO
|
callback_object = get_field(event, 'callback')
|
||||||
pass
|
symbol = get_field(event, 'symbol')
|
||||||
|
self._data.add_callback(callback_object, metadata.timestamp, symbol)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue