Merge "The script parse_cts_report.py converts single cts report into \ three files: info.json result.csv summary.csv" into main am: 3492dd1d35

Original change: https://android-review.googlesource.com/c/platform/development/+/2704355

Change-Id: I4481aa6ad1453c1ea03d45457c37c00e9870af71
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
This commit is contained in:
Zoe Tsou
2023-08-15 10:24:52 +00:00
committed by Automerger Merge Worker
2 changed files with 271 additions and 123 deletions

View File

@@ -0,0 +1,94 @@
#!/usr/bin/python3
#
# Copyright (C) 2023 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#
"""Aggregate several cts reports into information files.
Given several cts reports, where a cts report could be a zip file or
test_result.xml, this script convert them into one set of information files.
The reports must be based on the same build fingerprint.
"""
import argparse
import os
import tempfile
import zipfile
import parse_cts_report
def aggregate_cts_reports(report_files):
"""Aggregate all report files and produce information files to output_dir.
If the results of the same test are different in two reports, choose the one
with a higher priority, following the order: PASS > IGNORED
> ASSUMPTION_FAILURE > FAIL > TEST_ERROR > TEST_STATUS_UNSPECIFIED.
Args:
report_files: A list of paths to cts reports.
Raises:
UserWarning: Report files not compatible.
Returns:
A dictionary that maps build_fingerprint to a CtsReport object.
"""
first_report_file = report_files[0]
report = parse_cts_report.parse_report_file(first_report_file)
with tempfile.TemporaryDirectory() as temp_dir:
for report_file in report_files[1:]:
xml_path = (
parse_cts_report.extract_xml_from_zip(report_file, temp_dir)
if zipfile.is_zipfile(report_file)
else report_file)
test_info = parse_cts_report.get_test_info_xml(xml_path)
if not report.is_compatible(test_info):
msg = (f'{report_file} is incompatible to {first_report_file}.')
raise UserWarning(msg)
report.read_test_result_xml(xml_path)
return report
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--report-files', required=True, nargs='+',
help=('Path to cts report(s), where a cts report could '
'be a zip archive or a xml file.'))
parser.add_argument('-d', '--output-dir', required=True,
help=('Path to the directory to store output files.'))
args = parser.parse_args()
report_files = args.report_files
output_dir = args.output_dir
if not os.path.exists(output_dir):
raise FileNotFoundError(f'Output directory {output_dir} does not exist.')
report = aggregate_cts_reports(report_files)
report.output_files(output_dir)
if __name__ == '__main__':
main()

View File

@@ -14,13 +14,10 @@
# License for the specific language governing permissions and limitations under
# the License.
#
"""Turn a cts report into three information files.
"""Convert single cts report into information files.
Given a zip file or a test_result.xml, this script read the xml file
and produce three output files:
info.json
result.csv
summary.csv
Given a cts report, which could be a zip file or test_result.xml, this script
turns them into three files: info.json, result.csv, and summary.csv.
"""
import argparse
@@ -28,31 +25,163 @@ import csv
import json
import os
import shutil
import tempfile
import xml.etree.ElementTree as ET
import zipfile
# TODO(b/293809772): Logging test result.
# TODO(b/293809772): Aggregate several CTS reports.
# TODO(b/293809772): Logging.
class ModuleSummary():
"""Class to record the test result summary of a cts module."""
class CtsReport:
"""Class to record the test result of a cts report."""
def __init__(self):
self.counter = {
'pass': 0,
'fail': 0,
'IGNORED': 0,
'ASSUMPTION_FAILURE': 0,
'TEST_ERROR': 0,
'TEST_STATUS_UNSPECIFIED': 0,
}
STATUS_ORDER = ['pass', 'IGNORED', 'ASSUMPTION_FAILURE', 'fail',
'TEST_ERROR', 'TEST_STATUS_UNSPECIFIED']
def print_info(self):
for key, value in self.counter.items():
print(f'{key}: {value}')
print()
def __init__(self, info):
self.info = info
self.result_tree = {}
self.module_summaries = {}
def is_compatible(self, info):
return self.info['build_fingerprint'] == info['build_fingerprint']
def get_test_status(self, module_name, abi, class_name, test_name):
"""Get test status from the CtsReport object."""
if module_name not in self.result_tree:
return None
abis = self.result_tree[module_name]
if abi not in abis:
return None
test_classes = abis[abi]
if class_name not in test_classes:
return None
tests = test_classes[class_name]
if test_name not in tests:
return None
return tests[test_name]
def set_test_status(self, module_name, abi,
class_name, test_name, test_status):
"""Set test status to the CtsReport object."""
previous = self.get_test_status(module_name, abi, class_name, test_name)
abis = self.result_tree.setdefault(module_name, {})
test_classes = abis.setdefault(abi, {})
tests = test_classes.setdefault(class_name, {})
if not previous:
tests[test_name] = test_status
module_summary = self.module_summaries.setdefault(module_name, {})
summary = module_summary.setdefault(abi, self.ModuleSummary())
summary.counter[test_status] += 1
elif (CtsReport.STATUS_ORDER.index(test_status)
< CtsReport.STATUS_ORDER.index(previous)):
summary = self.module_summaries[module_name][abi]
tests[test_name] = test_status
summary.counter[previous] -= 1
summary.counter[test_status] += 1
def read_test_result_xml(self, test_result_path):
"""Read the result from test_result.xml into a CtsReport object."""
tree = ET.parse(test_result_path)
root = tree.getroot()
for module in root.iter('Module'):
module_name = module.attrib['name']
abi = module.attrib['abi']
for testcase in module.iter('TestCase'):
class_name = testcase.attrib['name']
for test in testcase.iter('Test'):
test_name = test.attrib['name']
result = test.attrib['result']
self.set_test_status(module_name, abi,
class_name, test_name, result)
def write_to_csv(self, result_csvfile, summary_csvfile):
"""Write the information of the report to the csv files.
Args:
result_csvfile: path to result.csv
summary_csvfile: path to summary.csv
"""
summary_writer = csv.writer(summary_csvfile)
summary_writer.writerow(['module_name', 'abi'] + CtsReport.STATUS_ORDER)
result_writer = csv.writer(result_csvfile)
result_writer.writerow(['module_name', 'abi',
'class_name', 'test_name', 'result'])
modules = self.result_tree
for module_name, abis in modules.items():
for abi, test_classes in abis.items():
module_summary = self.module_summaries[module_name][abi]
summary = module_summary.summary_list()
summary_writer.writerow([module_name, abi] + summary)
for class_name, tests in test_classes.items():
for test_name, result in tests.items():
result_writer.writerow([module_name, abi,
class_name, test_name, result])
def output_files(self, output_dir):
"""Produce output files into the directory."""
parsed_info_path = os.path.join(output_dir, 'info.json')
parsed_result_path = os.path.join(output_dir, 'result.csv')
parsed_summary_path = os.path.join(output_dir, 'summary.csv')
files = [parsed_info_path, parsed_result_path, parsed_summary_path]
for f in files:
if os.path.exists(f):
raise FileExistsError(f'Output file {f} already exists.')
with open(parsed_info_path, 'w') as info_file:
info_file.write(json.dumps(self.info, indent=2))
with (
open(parsed_result_path, 'w') as result_csvfile,
open(parsed_summary_path, 'w') as summary_csvfile,
):
self.write_to_csv(result_csvfile, summary_csvfile)
for f in files:
print(f'Parsed output {f}')
return files
class ModuleSummary:
"""Record the result summary of each (module, abi) pair."""
def __init__(self):
self.counter = {}
for status in CtsReport.STATUS_ORDER:
self.counter[status] = 0
def print_summary(self):
for key in CtsReport.STATUS_ORDER:
print(f'{key}: {self.counter[key]}')
print()
def summary_list(self):
return [self.counter[type] for type in CtsReport.STATUS_ORDER]
ATTRS_TO_SHOW = ['Result::Build.build_model',
@@ -68,12 +197,6 @@ ATTRS_TO_SHOW = ['Result::Build.build_model',
'Result.suite_version',
'Result.suite_plan',
'Result.suite_build_number',]
RESULTS = ['pass',
'fail',
'IGNORED',
'ASSUMPTION_FAILURE',
'TEST_ERROR',
'TEST_STATUS_UNSPECIFIED',]
def parse_attrib_path(attrib_path):
@@ -84,10 +207,13 @@ def parse_attrib_path(attrib_path):
return tags, attr_name
def get_test_info(root):
"""Get test info from xml tree."""
def get_test_info_xml(test_result_path):
"""Get test info from xml file."""
test_info = {}
tree = ET.parse(test_result_path)
root = tree.getroot()
test_info = {'source_path': test_result_path}
for attrib_path in ATTRS_TO_SHOW:
tags, attr_name = parse_attrib_path(attrib_path)
@@ -105,11 +231,9 @@ def get_test_info(root):
return test_info
def print_test_info(test_result):
def print_test_info(info):
"""Print test information of the result in table format."""
info = test_result['info']
max_key_len = max([len(k) for k in info])
max_value_len = max([len(info[k]) for k in info])
table_len = (max_key_len + 2 + max_value_len)
@@ -123,85 +247,34 @@ def print_test_info(test_result):
print()
def extract_xml_from_zip(zip_file_path, output_dir):
def extract_xml_from_zip(zip_file_path, dest_dir):
"""Extract test_result.xml from the zip file."""
sub_dir_name = os.path.splitext(os.path.basename(zip_file_path))[0]
xml_path = os.path.join(sub_dir_name, 'test_result.xml')
extracted_xml = os.path.join(output_dir, 'test_result.xml')
extracted_xml = os.path.join(dest_dir, 'test_result.xml')
with zipfile.ZipFile(zip_file_path) as myzip:
with myzip.open(xml_path) as source, open(extracted_xml, 'wb') as target:
shutil.copyfileobj(source, target)
return extracted_xml
def read_test_result_xml(test_result_path):
"""Given the path to a test_result.xml, read that into a dict."""
def parse_report_file(report_file):
"""Turn one cts report into a CtsReport object."""
tree = ET.parse(test_result_path)
root = tree.getroot()
with tempfile.TemporaryDirectory() as temp_dir:
xml_path = (
extract_xml_from_zip(report_file, temp_dir)
if zipfile.is_zipfile(report_file)
else report_file)
test_result = {}
test_result['info'] = get_test_info(root)
test_info = get_test_info_xml(xml_path)
print_test_info(test_info)
modules = {}
test_result['modules'] = modules
report = CtsReport(test_info)
report.read_test_result_xml(xml_path)
for module in root.iter('Module'):
module_name = module.attrib['name']
abi_name = module.attrib['abi']
abis = modules.setdefault(module_name, {})
testcases = abis.setdefault(abi_name, {})
for testcase in module.iter('TestCase'):
testcase_name = testcase.attrib['name']
tests = testcases.setdefault(testcase_name, {})
for test in testcase.iter('Test'):
test_name = test.attrib['name']
if test_name in tests:
print('[WARNING] duplicated test:', test_name)
tests[test_name] = test.attrib['result']
return test_result
def write_to_csv(test_result, result_csvfile, summary_csvfile):
"""Given a result dict, write to the csv files.
Args:
test_result: the dict returned from read_test_result(test_result.xml)
result_csvfile: path to result.csv
summary_csvfile: path to summary.csv
"""
result_writer = csv.writer(result_csvfile)
result_writer.writerow(['module_name', 'abi',
'class_name', 'test_name', 'result'])
summary_writer = csv.writer(summary_csvfile)
summary_writer.writerow(['module', 'abi', 'pass', 'fail', 'IGNORED',
'ASSUMPTION_FAILURE', 'TEST_ERROR',
'TEST_STATUS_UNSPECIFIED'])
modules = test_result['modules']
for module_name, abis in modules.items():
module_result_summary = ModuleSummary()
for abi_name, testcases in abis.items():
for testcase_name, tests in testcases.items():
for test_name, result in tests.items():
result_writer.writerow([module_name, abi_name,
testcase_name, test_name, result])
module_result_summary.counter[result] += 1
summary = [module_result_summary.counter[result] for result in RESULTS]
summary_writer.writerow([module_name, abi_name] + summary)
return report
def main():
@@ -215,34 +288,15 @@ def main():
args = parser.parse_args()
report_file = args.report_file
output_dir = args.output_dir
if not os.path.exists(output_dir):
raise FileNotFoundError(f'Output directory {output_dir} does not exist.')
xml_path = (
extract_xml_from_zip(args.report_file, output_dir)
if zipfile.is_zipfile(args.report_file)
else args.report_file)
test_result = read_test_result_xml(xml_path)
print_test_info(test_result)
parsed_info_path = os.path.join(output_dir, 'info.json')
parsed_result_path = os.path.join(output_dir, 'result.csv')
parsed_summary_path = os.path.join(output_dir, 'summary.csv')
with open(parsed_info_path, 'w') as info_file:
info_file.write(json.dumps(test_result['info'], indent=2))
with (
open(parsed_result_path, 'w') as result_csvfile,
open(parsed_summary_path, 'w') as summary_csvfile,
):
write_to_csv(test_result, result_csvfile, summary_csvfile)
for f in [parsed_info_path, parsed_result_path, parsed_summary_path]:
print(f'Parsed output {f}')
report = parse_report_file(report_file)
report.output_files(output_dir)
if __name__ == '__main__':
main()