Merge "Compare two cts reports." into main
This commit is contained in:
@@ -64,7 +64,7 @@ def aggregate_cts_reports(report_files):
|
||||
msg = (f'{report_file} is incompatible to {first_report_file}.')
|
||||
raise UserWarning(msg)
|
||||
|
||||
report.read_test_result_xml(xml_path)
|
||||
report.read_test_result_xml(xml_path)
|
||||
|
||||
return report
|
||||
|
||||
169
tools/compare_cts_reports/compare_cts_reports.py
Executable file
169
tools/compare_cts_reports/compare_cts_reports.py
Executable file
@@ -0,0 +1,169 @@
|
||||
#!/usr/bin/python3
|
||||
#
|
||||
# Copyright (C) 2023 The Android Open Source Project
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||
# use this file except in compliance with the License. You may obtain a copy of
|
||||
# the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations under
|
||||
# the License.
|
||||
#
|
||||
"""Compare failed tests in CTS/VTS test_result.xml.
|
||||
|
||||
Given two report files (A and B), this script compare them in two modes:
|
||||
One-way mode: For all the failed tests in A, list the tests and the results in
|
||||
both reports.
|
||||
Two-way mode: For all the tests in A and B, list the tests and the results in
|
||||
both reports. If a test only exists in one report, show NO_DATA
|
||||
in another one.
|
||||
|
||||
Usage example:
|
||||
./compare_cts_reports.py -r test_result_1.xml test_result_2.xml
|
||||
-r test_result_3.xml -m 1 -d output_dir [-o]
|
||||
For this command line, the script aggregates test_result_1.xml and
|
||||
test_result_2.xml as one report, and then compare it with test_result_3.xml
|
||||
under one-way mode. The comparison result is written into output_dir/diff.csv.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import aggregate_cts_reports
|
||||
import parse_cts_report
|
||||
|
||||
|
||||
def one_way_compare(reports, diff_csv):
|
||||
"""Compare two reports in One-way Mode.
|
||||
|
||||
Given two sets of reports, aggregate them into two reports (A and B).
|
||||
Then, list all failed tests in A, and show result of the same test in A and B.
|
||||
|
||||
Args:
|
||||
reports: list of reports
|
||||
diff_csv: path to csv which stores comparison results
|
||||
"""
|
||||
|
||||
report_a = reports[0]
|
||||
report_b = reports[1]
|
||||
|
||||
with open(diff_csv, 'w') as diff_csvfile:
|
||||
diff_writer = csv.writer(diff_csvfile)
|
||||
diff_writer.writerow(['module_name', 'abi', 'class_name', 'test_name',
|
||||
'result in A', 'result in B'])
|
||||
|
||||
for keys in report_a.gen_keys_list():
|
||||
module_name, abi, class_name, test_name = keys
|
||||
result_in_a = report_a.get_test_status(module_name, abi,
|
||||
class_name, test_name)
|
||||
|
||||
if parse_cts_report.CtsReport.is_fail(result_in_a):
|
||||
result_in_b = report_b.get_test_status(module_name, abi,
|
||||
class_name, test_name)
|
||||
|
||||
diff_writer.writerow([module_name, abi, class_name, test_name,
|
||||
result_in_a, result_in_b])
|
||||
|
||||
|
||||
def two_way_compare(reports, diff_csv):
|
||||
"""Compare two reports in Two-way Mode.
|
||||
|
||||
Given two sets of reports, aggregate them into two reports (A and B).
|
||||
Then, list all tests and show the results in A and B. If a test result exists
|
||||
in only one report, consider the result as NO_DATA in another report.
|
||||
|
||||
Args:
|
||||
reports: list of reports
|
||||
diff_csv: path to csv which stores comparison results
|
||||
"""
|
||||
|
||||
diff = {}
|
||||
|
||||
for i, report in enumerate(reports):
|
||||
for keys in report.gen_keys_list():
|
||||
module_name, abi, class_name, test_name = keys
|
||||
|
||||
abis = diff.setdefault(module_name, {})
|
||||
test_classes = abis.setdefault(abi, {})
|
||||
tests = test_classes.setdefault(class_name, {})
|
||||
|
||||
result = report.get_test_status(module_name, abi, class_name, test_name)
|
||||
|
||||
if test_name not in tests:
|
||||
tests[test_name] = [parse_cts_report.NO_DATA, parse_cts_report.NO_DATA]
|
||||
|
||||
tests[test_name][i] = result
|
||||
|
||||
with open(diff_csv, 'w') as diff_csvfile:
|
||||
diff_writer = csv.writer(diff_csvfile)
|
||||
diff_writer.writerow(['module_name', 'abi', 'class_name', 'test_name',
|
||||
'result in A', 'result in B'])
|
||||
|
||||
for module_name, abis in diff.items():
|
||||
for abi, test_classes in abis.items():
|
||||
for class_name, tests in test_classes.items():
|
||||
for test_name, results in tests.items():
|
||||
if results[0] != results[1]:
|
||||
row = [module_name, abi, class_name, test_name] + results
|
||||
diff_writer.writerow(row)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
parser.add_argument('--reports', '-r', required=True, nargs='+',
|
||||
help=('Path to cts reports. Each flag -r is followed by'
|
||||
'a group of files to be aggregated as one report.'),
|
||||
action='append')
|
||||
parser.add_argument('--mode', '-m', required=True, choices=['1', '2'],
|
||||
help='Comparison mode. 1: One-way mode. 2: Two-way mode.')
|
||||
parser.add_argument('--output-dir', '-d', required=True,
|
||||
help='Directory to store output files.')
|
||||
parser.add_argument('--csv', default='diff.csv', help='Path to csv output.')
|
||||
parser.add_argument('--output-files', '-o', action='store_true',
|
||||
help='Output parsed csv files.')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
report_files = args.reports
|
||||
mode = args.mode
|
||||
if (mode in ['1', '2']) and (len(report_files) != 2):
|
||||
msg = ('Two sets of reports are required for one-way and two-way mode.')
|
||||
raise UserWarning(msg)
|
||||
|
||||
output_dir = args.output_dir
|
||||
if not os.path.exists(output_dir):
|
||||
raise FileNotFoundError(f'Output directory {output_dir} does not exist.')
|
||||
|
||||
diff_csv = os.path.join(output_dir, args.csv)
|
||||
|
||||
ctsreports = []
|
||||
for i, report_group in enumerate(report_files):
|
||||
report = aggregate_cts_reports.aggregate_cts_reports(report_group)
|
||||
|
||||
if args.output_files:
|
||||
device_name = report.info['build_device']
|
||||
sub_dir_name = tempfile.mkdtemp(prefix=f'{i}_{device_name}_',
|
||||
dir=output_dir)
|
||||
report.output_files(sub_dir_name)
|
||||
|
||||
ctsreports.append(report)
|
||||
|
||||
if args.mode == '1':
|
||||
one_way_compare(ctsreports, diff_csv)
|
||||
elif args.mode == '2':
|
||||
two_way_compare(ctsreports, diff_csv)
|
||||
else:
|
||||
# TODO(b/292453652): Implement N-way comparison.
|
||||
print('Error: Arg --mode must be 1 or 2.')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -33,6 +33,9 @@ import zipfile
|
||||
# TODO(b/293809772): Logging.
|
||||
|
||||
|
||||
NO_DATA = 'null'
|
||||
|
||||
|
||||
class CtsReport:
|
||||
"""Class to record the test result of a cts report."""
|
||||
|
||||
@@ -44,6 +47,27 @@ class CtsReport:
|
||||
self.result_tree = {}
|
||||
self.module_summaries = {}
|
||||
|
||||
@staticmethod
|
||||
def is_fail(status):
|
||||
if status == NO_DATA:
|
||||
return False
|
||||
else:
|
||||
return (CtsReport.STATUS_ORDER.index(status) >= 3)
|
||||
|
||||
def gen_keys_list(self):
|
||||
"""Generate a 2D-list of keys."""
|
||||
|
||||
keys_list = []
|
||||
|
||||
modules = self.result_tree
|
||||
|
||||
for module_name, abis in modules.items():
|
||||
for abi, test_classes in abis.items():
|
||||
for class_name, tests in test_classes.items():
|
||||
for test_name in tests.keys():
|
||||
keys_list.append([module_name, abi, class_name, test_name])
|
||||
return keys_list
|
||||
|
||||
def is_compatible(self, info):
|
||||
return self.info['build_fingerprint'] == info['build_fingerprint']
|
||||
|
||||
@@ -51,19 +75,19 @@ class CtsReport:
|
||||
"""Get test status from the CtsReport object."""
|
||||
|
||||
if module_name not in self.result_tree:
|
||||
return None
|
||||
return NO_DATA
|
||||
abis = self.result_tree[module_name]
|
||||
|
||||
if abi not in abis:
|
||||
return None
|
||||
return NO_DATA
|
||||
test_classes = abis[abi]
|
||||
|
||||
if class_name not in test_classes:
|
||||
return None
|
||||
return NO_DATA
|
||||
tests = test_classes[class_name]
|
||||
|
||||
if test_name not in tests:
|
||||
return None
|
||||
return NO_DATA
|
||||
|
||||
return tests[test_name]
|
||||
|
||||
@@ -77,7 +101,7 @@ class CtsReport:
|
||||
test_classes = abis.setdefault(abi, {})
|
||||
tests = test_classes.setdefault(class_name, {})
|
||||
|
||||
if not previous:
|
||||
if previous == NO_DATA:
|
||||
tests[test_name] = test_status
|
||||
|
||||
module_summary = self.module_summaries.setdefault(module_name, {})
|
||||
@@ -171,9 +195,7 @@ class CtsReport:
|
||||
"""Record the result summary of each (module, abi) pair."""
|
||||
|
||||
def __init__(self):
|
||||
self.counter = {}
|
||||
for status in CtsReport.STATUS_ORDER:
|
||||
self.counter[status] = 0
|
||||
self.counter = dict.fromkeys(CtsReport.STATUS_ORDER, 0)
|
||||
|
||||
def print_summary(self):
|
||||
for key in CtsReport.STATUS_ORDER:
|
||||
@@ -1,208 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
"""Compare failed tests in CTS/VTS test_result.xml.
|
||||
|
||||
Given two test_result.xml's (A and B), this script lists all failed tests in A,
|
||||
and shows result of the same test in B.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import collections
|
||||
import csv
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
|
||||
PASS = 'pass'
|
||||
FAIL = 'fail'
|
||||
NO_DATA = 'no_data'
|
||||
|
||||
ATTRS_TO_SHOW = ['Result::Build.build_model',
|
||||
'Result::Build.build_id',
|
||||
'Result.suite_name',
|
||||
'Result.suite_plan',
|
||||
'Result.suite_build_number',
|
||||
'Result.start_display',
|
||||
'Result::Build.build_abis_32',
|
||||
'Result::Build.build_abis_64',]
|
||||
|
||||
|
||||
def parse_attrib_path(attrib_path):
|
||||
first_dot = attrib_path.index('.')
|
||||
tags = attrib_path[:first_dot].split('::')
|
||||
attr_name = attrib_path[first_dot+1:]
|
||||
return tags, attr_name
|
||||
|
||||
|
||||
def get_test_info(root):
|
||||
"""Get test info from test_result.xml."""
|
||||
|
||||
test_info = collections.OrderedDict()
|
||||
|
||||
for attrib_path in ATTRS_TO_SHOW:
|
||||
tags, attr_name = parse_attrib_path(attrib_path)
|
||||
node = root
|
||||
|
||||
while True:
|
||||
tags = tags[1:]
|
||||
if tags:
|
||||
node = node.find(tags[0])
|
||||
else:
|
||||
break
|
||||
|
||||
test_info[attr_name] = node.attrib[attr_name]
|
||||
|
||||
return test_info
|
||||
|
||||
|
||||
def print_test_infos(test_result_a, test_result_b):
|
||||
"""Print test infomation of both results in table format."""
|
||||
|
||||
info_a = test_result_a['info']
|
||||
info_b = test_result_b['info']
|
||||
|
||||
max_key_len = max([len(k) for k in info_a])
|
||||
max_value_a_len = max([len(info_a[k]) for k in info_a])
|
||||
max_value_b_len = max([len(info_b[k]) for k in info_b])
|
||||
table_len = (max_key_len + 2 + max_value_a_len + 2 + max_value_b_len)
|
||||
|
||||
line_format = '{:{}} {:{}} {}'
|
||||
|
||||
print('=' * table_len)
|
||||
|
||||
for key in info_a:
|
||||
print(line_format.format(key, max_key_len,
|
||||
info_a[key], max_value_a_len,
|
||||
info_b[key]))
|
||||
|
||||
print('=' * table_len)
|
||||
print()
|
||||
|
||||
|
||||
def get_result(test_result, module_name, testcase_name, test_name):
|
||||
"""Get result of specific module, testcase and test name."""
|
||||
|
||||
modules = test_result['modules']
|
||||
if module_name not in modules:
|
||||
return NO_DATA
|
||||
|
||||
testcases = modules[module_name]
|
||||
if testcase_name not in testcases:
|
||||
return NO_DATA
|
||||
|
||||
tests = testcases[testcase_name]
|
||||
if test_name not in tests:
|
||||
return NO_DATA
|
||||
|
||||
return ', '.join([x + ': ' + y for x, y in tests[test_name].items()])
|
||||
|
||||
|
||||
def read_test_result_xml(test_result_path):
|
||||
"""Given the path to a test_result.xml, read that into a ordered dict."""
|
||||
|
||||
tree = ET.parse(test_result_path)
|
||||
root = tree.getroot()
|
||||
|
||||
test_result = collections.OrderedDict()
|
||||
test_result['info'] = get_test_info(root)
|
||||
|
||||
modules = collections.OrderedDict()
|
||||
test_result['modules'] = modules
|
||||
|
||||
for module in root.iter('Module'):
|
||||
abi = module.attrib['abi']
|
||||
|
||||
module_name = module.attrib['name']
|
||||
|
||||
if module_name not in modules:
|
||||
modules[module_name] = collections.OrderedDict()
|
||||
|
||||
testcases = modules[module_name]
|
||||
|
||||
for testcase in module.iter('TestCase'):
|
||||
testcase_name = testcase.attrib['name']
|
||||
|
||||
if testcase_name not in testcases:
|
||||
testcases[testcase_name] = collections.OrderedDict()
|
||||
|
||||
tests = testcases[testcase_name]
|
||||
|
||||
for test in testcase.iter('Test'):
|
||||
test_name = test.attrib['name']
|
||||
|
||||
if test_name not in tests:
|
||||
tests[test_name] = collections.OrderedDict()
|
||||
|
||||
if abi in tests[test_name]:
|
||||
print('[WARNING] duplicated test:', test_name)
|
||||
|
||||
tests[test_name][abi] = test.attrib['result']
|
||||
|
||||
return test_result
|
||||
|
||||
|
||||
def compare_failed_tests(test_result_a, test_result_b, csvfile):
|
||||
"""Do the comparison.
|
||||
|
||||
Given two test result dicts (A and B), list all failed test in A and display
|
||||
result of the same test in B.
|
||||
|
||||
Args:
|
||||
test_result_a: the dict returned from read_test_result(test_result_a.xml)
|
||||
test_result_b: the dict returned from read_test_result(test_result_b.xml)
|
||||
csvfile: a opened file
|
||||
|
||||
Returns:
|
||||
string: diff report, summary
|
||||
"""
|
||||
|
||||
writer = csv.writer(csvfile)
|
||||
writer.writerow(['module', 'testcase', 'test', 'result in B'])
|
||||
|
||||
summary = ''
|
||||
|
||||
modules = test_result_a['modules']
|
||||
|
||||
for module_name, testcases in modules.items():
|
||||
module_sub_summary = ''
|
||||
|
||||
for testcase_name, tests in testcases.items():
|
||||
testcase_sub_summary = ''
|
||||
|
||||
for test_name, result in tests.items():
|
||||
if FAIL in result.values():
|
||||
result_b = get_result(
|
||||
test_result_b, module_name, testcase_name, test_name)
|
||||
|
||||
testcase_sub_summary += ' ' + test_name + ': ' + result_b + '\n'
|
||||
writer.writerow([module_name, testcase_name, test_name, result_b])
|
||||
|
||||
if testcase_sub_summary:
|
||||
module_sub_summary = ' ' + testcase_name + '\n' + testcase_sub_summary
|
||||
|
||||
if module_sub_summary:
|
||||
summary += module_name + '\n' + module_sub_summary + '\n'
|
||||
|
||||
return summary
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
|
||||
parser.add_argument('test_result_a', help='path to first test_result.xml')
|
||||
parser.add_argument('test_result_b', help='path to second test_result.xml')
|
||||
parser.add_argument('--csv', default='diff.csv', help='path to csv output')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
test_result_a = read_test_result_xml(args.test_result_a)
|
||||
test_result_b = read_test_result_xml(args.test_result_b)
|
||||
|
||||
print_test_infos(test_result_a, test_result_b)
|
||||
|
||||
with open(args.csv, 'w') as csvfile:
|
||||
summary = compare_failed_tests(test_result_a, test_result_b, csvfile)
|
||||
|
||||
print(summary)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user