Extract processing path checking function
This commit is contained in:
parent
87ff5c245a
commit
329151d7d9
1 changed files with 64 additions and 18 deletions
|
@ -19,6 +19,7 @@ import argparse
|
|||
import os
|
||||
import sys
|
||||
import time
|
||||
from typing import Tuple
|
||||
|
||||
from tracetools_analysis.convert import convert
|
||||
from tracetools_analysis.convert import DEFAULT_CONVERT_FILE_NAME
|
||||
|
@ -41,49 +42,86 @@ def parse_args():
|
|||
return parser.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
input_path = args.input_path
|
||||
force_conversion = args.force_conversion
|
||||
def inspect_input_path(
|
||||
input_path: str,
|
||||
force_conversion: bool = False,
|
||||
) -> Tuple[str, bool]:
|
||||
"""
|
||||
Check input path for a converted file or a trace directory.
|
||||
|
||||
converted_file_path = None
|
||||
If the input path is a file, it uses it as a converted file.
|
||||
If the input path is a directory, it checks if there is a "converted" file directly inside it,
|
||||
otherwise it tries to import the path as a trace directory.
|
||||
If `force_conversion` is set to `True`, even if a converted file is found, it will ask to
|
||||
re-create it.
|
||||
|
||||
:param input_path: the path to a converted file or trace directory
|
||||
:param force_conversion: whether to re-creating converted file even if it is found
|
||||
:return:
|
||||
the path to a converted file (or `None` if could not find),
|
||||
`True` if the given converted file should be (re-)created, `False` otherwise
|
||||
"""
|
||||
input_path = os.path.expanduser(input_path)
|
||||
converted_file_path = None
|
||||
# Check if not a file
|
||||
if not os.path.isfile(input_path):
|
||||
input_directory = input_path
|
||||
# Might be a (trace) directory
|
||||
# Check if there is a converted file under the given directory
|
||||
prospective_converted_file_path = os.path.join(input_directory, DEFAULT_CONVERT_FILE_NAME)
|
||||
if os.path.isfile(prospective_converted_file_path):
|
||||
prospective_converted_file = os.path.join(input_directory, DEFAULT_CONVERT_FILE_NAME)
|
||||
if os.path.isfile(prospective_converted_file):
|
||||
# Use that as the converted input file
|
||||
converted_file_path = prospective_converted_file_path
|
||||
converted_file_path = prospective_converted_file
|
||||
if force_conversion:
|
||||
print(f'found converted file but re-creating it: {converted_file_path}')
|
||||
convert(input_directory, DEFAULT_CONVERT_FILE_NAME)
|
||||
print(f'found converted file but will re-create it: {prospective_converted_file}')
|
||||
return prospective_converted_file, True
|
||||
else:
|
||||
print(f'found converted file: {converted_file_path}')
|
||||
print(f'found converted file: {prospective_converted_file}')
|
||||
return prospective_converted_file, False
|
||||
else:
|
||||
# Check if it is a trace directory
|
||||
# Result could be unexpected because it will look for trace directories recursively
|
||||
# (e.g. '/' is a valid trace directory if there is at least one trace anywhere)
|
||||
if is_trace_directory(input_directory):
|
||||
# Convert trace directory first to create converted file
|
||||
convert(input_directory, DEFAULT_CONVERT_FILE_NAME)
|
||||
converted_file_path = prospective_converted_file_path
|
||||
return prospective_converted_file, True
|
||||
else:
|
||||
# We cannot do anything
|
||||
print(
|
||||
f'cannot find either a trace directory or a converted file: {input_directory}',
|
||||
file=sys.stderr)
|
||||
return 1
|
||||
return None, None
|
||||
else:
|
||||
converted_file_path = input_path
|
||||
if force_conversion:
|
||||
# It's a file, but re-create it anyway
|
||||
print(f'found converted file but re-creating it: {converted_file_path}')
|
||||
input_directory = os.path.dirname(converted_file_path)
|
||||
input_file_name = os.path.basename(converted_file_path)
|
||||
convert(input_directory, input_file_name)
|
||||
print(f'found converted file but will re-create it: {converted_file_path}')
|
||||
return converted_file_path, True
|
||||
else:
|
||||
# Simplest use-case: given path is an existing converted file
|
||||
return converted_file_path, False
|
||||
|
||||
|
||||
def process(
|
||||
input_path: str,
|
||||
force_conversion: bool = False,
|
||||
) -> None:
|
||||
"""
|
||||
Process converted trace file.
|
||||
|
||||
:param input_path: the path to a converted file or trace directory
|
||||
:param force_conversion: whether to re-creating converted file even if it is found
|
||||
"""
|
||||
converted_file_path, create_converted_file = inspect_input_path(input_path, force_conversion)
|
||||
|
||||
if converted_file_path is None:
|
||||
return 1
|
||||
|
||||
# Convert trace directory to file if necessary
|
||||
if create_converted_file:
|
||||
input_directory = os.path.dirname(converted_file_path)
|
||||
input_file_name = os.path.basename(converted_file_path)
|
||||
convert(input_directory, input_file_name)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
|
@ -93,3 +131,11 @@ def main():
|
|||
time_diff = time.time() - start_time
|
||||
ros2_handler.data.print_model()
|
||||
print(f'processed {len(events)} events in {time_diff * 1000:.2f} ms')
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
input_path = args.input_path
|
||||
force_conversion = args.force_conversion
|
||||
|
||||
process(input_path, force_conversion)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue