dataflow-analysis/clang_interop/process_clang_output.py

231 lines
6.7 KiB
Python

import functools
import json
import os
import pickle
from typing import Iterable
import numpy as np
import pandas as pd
from clang_interop.cl_types import ClNode, ClField, ClTimer, ClMethod, ClPublisher, ClSubscription, ClMemberRef, ClContext, \
ClTranslationUnit
IN_DIR = "/home/max/Projects/ma-ros2-internal-dependency-analyzer/output"
SRC_DIR = "/home/max/Projects/autoware/src"
OUT_NAME = "clang_objects.pkl"
def SRC_FILE_NAME(in_file_name: str):
return os.path.join(SRC_DIR, in_file_name.replace("-", "/").replace(".json", ".cpp"))
ignored_idfs = set()
class SetEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, set):
return list(o)
match o:
case set():
return list(o)
case list() | dict() | int() | float() | str():
return json.JSONEncoder.default(self, o)
case np.int64:
return json.JSONEncoder.default(self, int(o))
return json.JSONEncoder.default(self, o)
def fuse_fields(f1, f2):
if f1 is None:
return f2
if f2 is None:
return f1
if f1 == f2:
return f1
raise ValueError(f"Inconsistent fields {f1=} and {f2=} cannot be fused")
def fuse_objects(o1, o2):
field_names = o1.__dataclass_fields__.keys()
for f in field_names:
setattr(o1, f, fuse_fields(getattr(o1, f), getattr(o2, f)))
return o1
def find_data_deps(accesses: Iterable[ClMemberRef]):
writes = set()
reads = set()
publications = {}
for member_ref in accesses:
member_id = member_ref.member_chain[0] if member_ref.member_chain else None
if member_id is None:
print(f"[WARN ] MemberRef without any members in chain @ {member_ref.source_range}")
continue
dep_tuple = (member_ref.method_id, member_id)
match member_ref.type:
case "write":
writes.add(dep_tuple)
case "read":
reads.add(dep_tuple)
case "call" | "arg":
writes.add(dep_tuple)
reads.add(dep_tuple)
case "pub":
if member_ref.method_id not in publications:
publications[member_ref.method_id] = set()
publications[member_ref.method_id].add(member_id)
reads = pd.DataFrame.from_records(list(reads), columns=['method_id', 'member_id'])
writes = pd.DataFrame.from_records(list(writes), columns=['method_id', 'member_id'])
deps = {}
for reading_method in reads["method_id"].unique().tolist():
deps[reading_method] = set()
read_members = reads[reads['method_id'] == reading_method]["member_id"].unique().tolist()
for read_member in read_members:
writing_methods = writes[writes['member_id'] == read_member]['method_id'].unique().tolist()
deps[reading_method].update(writing_methods)
deps[reading_method].discard(reading_method) # Remove reflexive dependencies
return deps, publications
def dedup(elems):
hash_map = {}
for e in elems:
if e.__hash__() not in hash_map:
hash_map[e.__hash__()] = []
hash_map[e.__hash__()].append(e)
ret_list = []
for hash, elems in hash_map.items():
if len(elems) == 1:
ret_list += elems
continue
elem = functools.reduce(fuse_objects, elems[1:], elems[0])
ret_list.append(elem)
print(f"Fused {len(elems)} {type(elem)}s")
return set(ret_list)
def dictify(elems, key='id'):
return {getattr(e, key): e for e in elems}
def definitions_from_json(cb_dict, tu):
nodes = []
pubs = []
subs = []
timers = []
accesses = []
fields = []
methods = []
if "nodes" in cb_dict:
for node in cb_dict["nodes"]:
nodes.append(ClNode(node, tu))
for field in node["fields"]:
fields.append(ClField(field, tu))
for method in node["methods"]:
methods.append(ClMethod(method, tu))
if "publishers" in cb_dict:
for publisher in cb_dict["publishers"]:
pubs.append(ClPublisher(publisher, tu))
if "subscriptions" in cb_dict:
for subscription in cb_dict["subscriptions"]:
subs.append(ClSubscription(subscription, tu))
if "callback" in subscription:
methods.append(ClMethod(subscription["callback"], tu))
if "timers" in cb_dict:
for timer in cb_dict["timers"]:
timers.append(ClTimer(timer, tu))
if "callback" in timer:
methods.append(ClMethod(timer["callback"], tu))
if "accesses" in cb_dict:
for access_type in cb_dict["accesses"]:
for access in cb_dict["accesses"][access_type]:
accesses.append(ClMemberRef(access, tu))
if "method" in access["context"]:
methods.append(ClMethod(access["context"]["method"], tu))
nodes = dedup(nodes)
pubs = dedup(pubs)
subs = dedup(subs)
timers = dedup(timers)
fields = dedup(fields)
methods = dedup(methods)
return nodes, pubs, subs, timers, fields, methods, accesses
def process_clang_output(directory=IN_DIR):
all_tus = set()
all_nodes = set()
all_pubs = set()
all_subs = set()
all_timers = set()
all_fields = set()
all_methods = set()
all_accesses = []
all_deps = {}
all_publications = {}
for filename in os.listdir(IN_DIR):
source_filename = SRC_FILE_NAME(filename)
print(f"Processing {source_filename}")
with open(os.path.join(IN_DIR, filename), "r") as f:
cb_dict = json.load(f)
if cb_dict is None:
print(f" [WARN ] Empty tool output detected in {filename}")
continue
tu = ClTranslationUnit(source_filename)
all_tus.add(tu)
nodes, pubs, subs, timers, fields, methods, accesses = definitions_from_json(cb_dict, tu)
deps, publications = find_data_deps(accesses)
all_nodes.update(nodes)
all_pubs.update(pubs)
all_subs.update(subs)
all_timers.update(timers)
all_fields.update(fields)
all_methods.update(methods)
all_accesses += accesses
all_deps.update(deps)
all_publications.update(publications)
clang_context = ClContext(all_tus, all_nodes, all_pubs, all_subs, all_timers, all_fields, all_methods, all_accesses,
all_deps, all_publications)
return clang_context
if __name__ == "__main__":
clang_context = process_clang_output()
with open(OUT_NAME, "wb") as f:
pickle.dump(clang_context, f)
print("Done.")