Merge "The script parse_cts_report.py converts single cts report into \ three files: info.json result.csv summary.csv" into main
This commit is contained in:
94
tools/compare_failed_tests/aggregate_reports.py
Executable file
94
tools/compare_failed_tests/aggregate_reports.py
Executable file
@@ -0,0 +1,94 @@
|
||||
#!/usr/bin/python3
|
||||
#
|
||||
# Copyright (C) 2023 The Android Open Source Project
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||
# use this file except in compliance with the License. You may obtain a copy of
|
||||
# the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations under
|
||||
# the License.
|
||||
#
|
||||
"""Aggregate several cts reports into information files.
|
||||
|
||||
Given several cts reports, where a cts report could be a zip file or
|
||||
test_result.xml, this script convert them into one set of information files.
|
||||
The reports must be based on the same build fingerprint.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import tempfile
|
||||
import zipfile
|
||||
|
||||
import parse_cts_report
|
||||
|
||||
|
||||
def aggregate_cts_reports(report_files):
|
||||
"""Aggregate all report files and produce information files to output_dir.
|
||||
|
||||
If the results of the same test are different in two reports, choose the one
|
||||
with a higher priority, following the order: PASS > IGNORED
|
||||
> ASSUMPTION_FAILURE > FAIL > TEST_ERROR > TEST_STATUS_UNSPECIFIED.
|
||||
|
||||
Args:
|
||||
report_files: A list of paths to cts reports.
|
||||
|
||||
Raises:
|
||||
UserWarning: Report files not compatible.
|
||||
|
||||
Returns:
|
||||
A dictionary that maps build_fingerprint to a CtsReport object.
|
||||
"""
|
||||
|
||||
first_report_file = report_files[0]
|
||||
|
||||
report = parse_cts_report.parse_report_file(first_report_file)
|
||||
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
|
||||
for report_file in report_files[1:]:
|
||||
xml_path = (
|
||||
parse_cts_report.extract_xml_from_zip(report_file, temp_dir)
|
||||
if zipfile.is_zipfile(report_file)
|
||||
else report_file)
|
||||
|
||||
test_info = parse_cts_report.get_test_info_xml(xml_path)
|
||||
|
||||
if not report.is_compatible(test_info):
|
||||
msg = (f'{report_file} is incompatible to {first_report_file}.')
|
||||
raise UserWarning(msg)
|
||||
|
||||
report.read_test_result_xml(xml_path)
|
||||
|
||||
return report
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
parser.add_argument('--report-files', required=True, nargs='+',
|
||||
help=('Path to cts report(s), where a cts report could '
|
||||
'be a zip archive or a xml file.'))
|
||||
parser.add_argument('-d', '--output-dir', required=True,
|
||||
help=('Path to the directory to store output files.'))
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
report_files = args.report_files
|
||||
output_dir = args.output_dir
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
raise FileNotFoundError(f'Output directory {output_dir} does not exist.')
|
||||
|
||||
report = aggregate_cts_reports(report_files)
|
||||
report.output_files(output_dir)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -14,13 +14,10 @@
|
||||
# License for the specific language governing permissions and limitations under
|
||||
# the License.
|
||||
#
|
||||
"""Turn a cts report into three information files.
|
||||
"""Convert single cts report into information files.
|
||||
|
||||
Given a zip file or a test_result.xml, this script read the xml file
|
||||
and produce three output files:
|
||||
info.json
|
||||
result.csv
|
||||
summary.csv
|
||||
Given a cts report, which could be a zip file or test_result.xml, this script
|
||||
turns them into three files: info.json, result.csv, and summary.csv.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
@@ -28,31 +25,163 @@ import csv
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import xml.etree.ElementTree as ET
|
||||
import zipfile
|
||||
|
||||
|
||||
# TODO(b/293809772): Logging test result.
|
||||
# TODO(b/293809772): Aggregate several CTS reports.
|
||||
# TODO(b/293809772): Logging.
|
||||
|
||||
|
||||
class ModuleSummary():
|
||||
"""Class to record the test result summary of a cts module."""
|
||||
class CtsReport:
|
||||
"""Class to record the test result of a cts report."""
|
||||
|
||||
def __init__(self):
|
||||
self.counter = {
|
||||
'pass': 0,
|
||||
'fail': 0,
|
||||
'IGNORED': 0,
|
||||
'ASSUMPTION_FAILURE': 0,
|
||||
'TEST_ERROR': 0,
|
||||
'TEST_STATUS_UNSPECIFIED': 0,
|
||||
}
|
||||
STATUS_ORDER = ['pass', 'IGNORED', 'ASSUMPTION_FAILURE', 'fail',
|
||||
'TEST_ERROR', 'TEST_STATUS_UNSPECIFIED']
|
||||
|
||||
def print_info(self):
|
||||
for key, value in self.counter.items():
|
||||
print(f'{key}: {value}')
|
||||
print()
|
||||
def __init__(self, info):
|
||||
self.info = info
|
||||
self.result_tree = {}
|
||||
self.module_summaries = {}
|
||||
|
||||
def is_compatible(self, info):
|
||||
return self.info['build_fingerprint'] == info['build_fingerprint']
|
||||
|
||||
def get_test_status(self, module_name, abi, class_name, test_name):
|
||||
"""Get test status from the CtsReport object."""
|
||||
|
||||
if module_name not in self.result_tree:
|
||||
return None
|
||||
abis = self.result_tree[module_name]
|
||||
|
||||
if abi not in abis:
|
||||
return None
|
||||
test_classes = abis[abi]
|
||||
|
||||
if class_name not in test_classes:
|
||||
return None
|
||||
tests = test_classes[class_name]
|
||||
|
||||
if test_name not in tests:
|
||||
return None
|
||||
|
||||
return tests[test_name]
|
||||
|
||||
def set_test_status(self, module_name, abi,
|
||||
class_name, test_name, test_status):
|
||||
"""Set test status to the CtsReport object."""
|
||||
|
||||
previous = self.get_test_status(module_name, abi, class_name, test_name)
|
||||
|
||||
abis = self.result_tree.setdefault(module_name, {})
|
||||
test_classes = abis.setdefault(abi, {})
|
||||
tests = test_classes.setdefault(class_name, {})
|
||||
|
||||
if not previous:
|
||||
tests[test_name] = test_status
|
||||
|
||||
module_summary = self.module_summaries.setdefault(module_name, {})
|
||||
summary = module_summary.setdefault(abi, self.ModuleSummary())
|
||||
summary.counter[test_status] += 1
|
||||
|
||||
elif (CtsReport.STATUS_ORDER.index(test_status)
|
||||
< CtsReport.STATUS_ORDER.index(previous)):
|
||||
summary = self.module_summaries[module_name][abi]
|
||||
|
||||
tests[test_name] = test_status
|
||||
|
||||
summary.counter[previous] -= 1
|
||||
summary.counter[test_status] += 1
|
||||
|
||||
def read_test_result_xml(self, test_result_path):
|
||||
"""Read the result from test_result.xml into a CtsReport object."""
|
||||
|
||||
tree = ET.parse(test_result_path)
|
||||
root = tree.getroot()
|
||||
|
||||
for module in root.iter('Module'):
|
||||
module_name = module.attrib['name']
|
||||
abi = module.attrib['abi']
|
||||
|
||||
for testcase in module.iter('TestCase'):
|
||||
class_name = testcase.attrib['name']
|
||||
|
||||
for test in testcase.iter('Test'):
|
||||
test_name = test.attrib['name']
|
||||
result = test.attrib['result']
|
||||
self.set_test_status(module_name, abi,
|
||||
class_name, test_name, result)
|
||||
|
||||
def write_to_csv(self, result_csvfile, summary_csvfile):
|
||||
"""Write the information of the report to the csv files.
|
||||
|
||||
Args:
|
||||
result_csvfile: path to result.csv
|
||||
summary_csvfile: path to summary.csv
|
||||
"""
|
||||
|
||||
summary_writer = csv.writer(summary_csvfile)
|
||||
summary_writer.writerow(['module_name', 'abi'] + CtsReport.STATUS_ORDER)
|
||||
|
||||
result_writer = csv.writer(result_csvfile)
|
||||
result_writer.writerow(['module_name', 'abi',
|
||||
'class_name', 'test_name', 'result'])
|
||||
|
||||
modules = self.result_tree
|
||||
|
||||
for module_name, abis in modules.items():
|
||||
for abi, test_classes in abis.items():
|
||||
module_summary = self.module_summaries[module_name][abi]
|
||||
summary = module_summary.summary_list()
|
||||
summary_writer.writerow([module_name, abi] + summary)
|
||||
|
||||
for class_name, tests in test_classes.items():
|
||||
for test_name, result in tests.items():
|
||||
result_writer.writerow([module_name, abi,
|
||||
class_name, test_name, result])
|
||||
|
||||
def output_files(self, output_dir):
|
||||
"""Produce output files into the directory."""
|
||||
|
||||
parsed_info_path = os.path.join(output_dir, 'info.json')
|
||||
parsed_result_path = os.path.join(output_dir, 'result.csv')
|
||||
parsed_summary_path = os.path.join(output_dir, 'summary.csv')
|
||||
|
||||
files = [parsed_info_path, parsed_result_path, parsed_summary_path]
|
||||
|
||||
for f in files:
|
||||
if os.path.exists(f):
|
||||
raise FileExistsError(f'Output file {f} already exists.')
|
||||
|
||||
with open(parsed_info_path, 'w') as info_file:
|
||||
info_file.write(json.dumps(self.info, indent=2))
|
||||
|
||||
with (
|
||||
open(parsed_result_path, 'w') as result_csvfile,
|
||||
open(parsed_summary_path, 'w') as summary_csvfile,
|
||||
):
|
||||
self.write_to_csv(result_csvfile, summary_csvfile)
|
||||
|
||||
for f in files:
|
||||
print(f'Parsed output {f}')
|
||||
|
||||
return files
|
||||
|
||||
class ModuleSummary:
|
||||
"""Record the result summary of each (module, abi) pair."""
|
||||
|
||||
def __init__(self):
|
||||
self.counter = {}
|
||||
for status in CtsReport.STATUS_ORDER:
|
||||
self.counter[status] = 0
|
||||
|
||||
def print_summary(self):
|
||||
for key in CtsReport.STATUS_ORDER:
|
||||
print(f'{key}: {self.counter[key]}')
|
||||
print()
|
||||
|
||||
def summary_list(self):
|
||||
return [self.counter[type] for type in CtsReport.STATUS_ORDER]
|
||||
|
||||
|
||||
ATTRS_TO_SHOW = ['Result::Build.build_model',
|
||||
@@ -68,12 +197,6 @@ ATTRS_TO_SHOW = ['Result::Build.build_model',
|
||||
'Result.suite_version',
|
||||
'Result.suite_plan',
|
||||
'Result.suite_build_number',]
|
||||
RESULTS = ['pass',
|
||||
'fail',
|
||||
'IGNORED',
|
||||
'ASSUMPTION_FAILURE',
|
||||
'TEST_ERROR',
|
||||
'TEST_STATUS_UNSPECIFIED',]
|
||||
|
||||
|
||||
def parse_attrib_path(attrib_path):
|
||||
@@ -84,10 +207,13 @@ def parse_attrib_path(attrib_path):
|
||||
return tags, attr_name
|
||||
|
||||
|
||||
def get_test_info(root):
|
||||
"""Get test info from xml tree."""
|
||||
def get_test_info_xml(test_result_path):
|
||||
"""Get test info from xml file."""
|
||||
|
||||
test_info = {}
|
||||
tree = ET.parse(test_result_path)
|
||||
root = tree.getroot()
|
||||
|
||||
test_info = {'source_path': test_result_path}
|
||||
|
||||
for attrib_path in ATTRS_TO_SHOW:
|
||||
tags, attr_name = parse_attrib_path(attrib_path)
|
||||
@@ -105,11 +231,9 @@ def get_test_info(root):
|
||||
return test_info
|
||||
|
||||
|
||||
def print_test_info(test_result):
|
||||
def print_test_info(info):
|
||||
"""Print test information of the result in table format."""
|
||||
|
||||
info = test_result['info']
|
||||
|
||||
max_key_len = max([len(k) for k in info])
|
||||
max_value_len = max([len(info[k]) for k in info])
|
||||
table_len = (max_key_len + 2 + max_value_len)
|
||||
@@ -123,85 +247,34 @@ def print_test_info(test_result):
|
||||
print()
|
||||
|
||||
|
||||
def extract_xml_from_zip(zip_file_path, output_dir):
|
||||
def extract_xml_from_zip(zip_file_path, dest_dir):
|
||||
"""Extract test_result.xml from the zip file."""
|
||||
|
||||
sub_dir_name = os.path.splitext(os.path.basename(zip_file_path))[0]
|
||||
xml_path = os.path.join(sub_dir_name, 'test_result.xml')
|
||||
extracted_xml = os.path.join(output_dir, 'test_result.xml')
|
||||
extracted_xml = os.path.join(dest_dir, 'test_result.xml')
|
||||
with zipfile.ZipFile(zip_file_path) as myzip:
|
||||
with myzip.open(xml_path) as source, open(extracted_xml, 'wb') as target:
|
||||
shutil.copyfileobj(source, target)
|
||||
return extracted_xml
|
||||
|
||||
|
||||
def read_test_result_xml(test_result_path):
|
||||
"""Given the path to a test_result.xml, read that into a dict."""
|
||||
def parse_report_file(report_file):
|
||||
"""Turn one cts report into a CtsReport object."""
|
||||
|
||||
tree = ET.parse(test_result_path)
|
||||
root = tree.getroot()
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
xml_path = (
|
||||
extract_xml_from_zip(report_file, temp_dir)
|
||||
if zipfile.is_zipfile(report_file)
|
||||
else report_file)
|
||||
|
||||
test_result = {}
|
||||
test_result['info'] = get_test_info(root)
|
||||
test_info = get_test_info_xml(xml_path)
|
||||
print_test_info(test_info)
|
||||
|
||||
modules = {}
|
||||
test_result['modules'] = modules
|
||||
report = CtsReport(test_info)
|
||||
report.read_test_result_xml(xml_path)
|
||||
|
||||
for module in root.iter('Module'):
|
||||
module_name = module.attrib['name']
|
||||
abi_name = module.attrib['abi']
|
||||
|
||||
abis = modules.setdefault(module_name, {})
|
||||
testcases = abis.setdefault(abi_name, {})
|
||||
|
||||
for testcase in module.iter('TestCase'):
|
||||
testcase_name = testcase.attrib['name']
|
||||
|
||||
tests = testcases.setdefault(testcase_name, {})
|
||||
|
||||
for test in testcase.iter('Test'):
|
||||
test_name = test.attrib['name']
|
||||
|
||||
if test_name in tests:
|
||||
print('[WARNING] duplicated test:', test_name)
|
||||
|
||||
tests[test_name] = test.attrib['result']
|
||||
|
||||
return test_result
|
||||
|
||||
|
||||
def write_to_csv(test_result, result_csvfile, summary_csvfile):
|
||||
"""Given a result dict, write to the csv files.
|
||||
|
||||
Args:
|
||||
test_result: the dict returned from read_test_result(test_result.xml)
|
||||
result_csvfile: path to result.csv
|
||||
summary_csvfile: path to summary.csv
|
||||
"""
|
||||
|
||||
result_writer = csv.writer(result_csvfile)
|
||||
result_writer.writerow(['module_name', 'abi',
|
||||
'class_name', 'test_name', 'result'])
|
||||
|
||||
summary_writer = csv.writer(summary_csvfile)
|
||||
summary_writer.writerow(['module', 'abi', 'pass', 'fail', 'IGNORED',
|
||||
'ASSUMPTION_FAILURE', 'TEST_ERROR',
|
||||
'TEST_STATUS_UNSPECIFIED'])
|
||||
|
||||
modules = test_result['modules']
|
||||
|
||||
for module_name, abis in modules.items():
|
||||
module_result_summary = ModuleSummary()
|
||||
|
||||
for abi_name, testcases in abis.items():
|
||||
for testcase_name, tests in testcases.items():
|
||||
for test_name, result in tests.items():
|
||||
result_writer.writerow([module_name, abi_name,
|
||||
testcase_name, test_name, result])
|
||||
module_result_summary.counter[result] += 1
|
||||
|
||||
summary = [module_result_summary.counter[result] for result in RESULTS]
|
||||
summary_writer.writerow([module_name, abi_name] + summary)
|
||||
return report
|
||||
|
||||
|
||||
def main():
|
||||
@@ -215,34 +288,15 @@ def main():
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
report_file = args.report_file
|
||||
output_dir = args.output_dir
|
||||
|
||||
if not os.path.exists(output_dir):
|
||||
raise FileNotFoundError(f'Output directory {output_dir} does not exist.')
|
||||
|
||||
xml_path = (
|
||||
extract_xml_from_zip(args.report_file, output_dir)
|
||||
if zipfile.is_zipfile(args.report_file)
|
||||
else args.report_file)
|
||||
test_result = read_test_result_xml(xml_path)
|
||||
|
||||
print_test_info(test_result)
|
||||
|
||||
parsed_info_path = os.path.join(output_dir, 'info.json')
|
||||
parsed_result_path = os.path.join(output_dir, 'result.csv')
|
||||
parsed_summary_path = os.path.join(output_dir, 'summary.csv')
|
||||
|
||||
with open(parsed_info_path, 'w') as info_file:
|
||||
info_file.write(json.dumps(test_result['info'], indent=2))
|
||||
|
||||
with (
|
||||
open(parsed_result_path, 'w') as result_csvfile,
|
||||
open(parsed_summary_path, 'w') as summary_csvfile,
|
||||
):
|
||||
write_to_csv(test_result, result_csvfile, summary_csvfile)
|
||||
|
||||
for f in [parsed_info_path, parsed_result_path, parsed_summary_path]:
|
||||
print(f'Parsed output {f}')
|
||||
report = parse_report_file(report_file)
|
||||
|
||||
report.output_files(output_dir)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user