Merge "Remove gsi_util"

This commit is contained in:
Treehugger Robot
2021-12-21 14:25:06 +00:00
committed by Gerrit Code Review
41 changed files with 0 additions and 2704 deletions

View File

@@ -1,5 +0,0 @@
bin/
lib/
lib64/
gsi_util.bin
gsi_util.zip

View File

@@ -1,46 +0,0 @@
// Copyright (C) 2017 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package {
default_applicable_licenses: ["Android-Apache-2.0"],
}
python_binary_host {
name: "gsi_util",
srcs: [
"gsi_util.py",
"gsi_util/*.py",
"gsi_util/checkers/*.py",
"gsi_util/commands/*.py",
"gsi_util/commands/common/*.py",
"gsi_util/dumpers/*.py",
"gsi_util/mounters/*.py",
"gsi_util/utils/*.py",
],
required: [
"adb",
"avbtool",
"checkvintf",
"secilc",
"simg2img",
],
version: {
py2: {
enabled: true,
},
py3: {
enabled: false,
},
},
}

View File

View File

@@ -1,160 +0,0 @@
#!/usr/bin/env python
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A utility to build and pack gsi_util."""
import argparse
from collections import namedtuple
import errno
import logging
import os
import shutil
import sys
import zipfile
from gsi_util.utils.cmd_utils import run_command
_MAKE_MODULE_NAME = 'gsi_util'
RequiredItem = namedtuple('RequiredItem', 'dest src')
# The list of dependency modules
# Format in (dest, src in host out)
REQUIRED_ITEMS = [
# named as 'gsi_util.bin' to avoid conflict with the folder 'gsi_util'
RequiredItem('gsi_util.bin', 'bin/gsi_util'),
RequiredItem('bin/checkvintf', 'bin/checkvintf'),
RequiredItem('lib64/libbase.so', 'lib64/libbase.so'),
RequiredItem('lib64/liblog.so', 'lib64/liblog.so'),
RequiredItem('bin/secilc', 'bin/secilc'),
RequiredItem('lib64/libsepol.so', 'lib64/libsepol.so'),
RequiredItem('bin/simg2img', 'bin/simg2img'),
RequiredItem('lib64/libc++.so', 'lib64/libc++.so'),
] # pyformat: disable
# Files to be included to zip file additionally
INCLUDE_FILES = [
'README.md'] # pyformat: disable
def _check_android_env():
if not os.environ.get('ANDROID_BUILD_TOP'):
raise EnvironmentError('Need \'lunch\'.')
def _switch_to_prog_dir():
prog = sys.argv[0]
abspath = os.path.abspath(prog)
dirname = os.path.dirname(abspath)
os.chdir(dirname)
def _make_all():
logging.info('Make %s...', _MAKE_MODULE_NAME)
build_top = os.environ['ANDROID_BUILD_TOP']
run_command(['make', '-j', _MAKE_MODULE_NAME], cwd=build_top)
def _create_dirs_and_copy_file(dest, src):
dir_path = os.path.dirname(dest)
try:
if dir_path != '':
os.makedirs(dir_path)
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
logging.debug('copy(): %s %s', src, dest)
shutil.copy(src, dest)
def _copy_deps():
logging.info('Copy depend files...')
host_out = os.environ['ANDROID_HOST_OUT']
logging.debug(' ANDROID_HOST_OUT=%s', host_out)
for item in REQUIRED_ITEMS:
print 'Copy {}...'.format(item.dest)
full_src = os.path.join(host_out, item.src)
_create_dirs_and_copy_file(item.dest, full_src)
def _build_zipfile(filename):
print 'Archive to {}...'.format(filename)
with zipfile.ZipFile(filename, mode='w') as zf:
for f in INCLUDE_FILES:
print 'Add {}'.format(f)
zf.write(f)
for f in REQUIRED_ITEMS:
print 'Add {}'.format(f[0])
zf.write(f[0])
def do_setup_env(args):
_check_android_env()
_make_all()
_switch_to_prog_dir()
_copy_deps()
def do_list_deps(args):
print 'Depend files (zip <== host out):'
for item in REQUIRED_ITEMS:
print ' {:20} <== {}'.format(item.dest, item.src)
print 'Other include files:'
for item in INCLUDE_FILES:
print ' {}'.format(item)
def do_build(args):
_check_android_env()
_make_all()
_switch_to_prog_dir()
_copy_deps()
_build_zipfile(args.output)
def main(argv):
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(title='COMMAND')
# Command 'setup_env'
setup_env_parser = subparsers.add_parser(
'setup_env',
help='setup environment by building and copying dependency files')
setup_env_parser.set_defaults(func=do_setup_env)
# Command 'list_dep'
list_dep_parser = subparsers.add_parser(
'list_deps', help='list all dependency files')
list_dep_parser.set_defaults(func=do_list_deps)
# Command 'build'
# TODO(szuweilin@): do not add this command if this is runned by package
build_parser = subparsers.add_parser(
'build', help='build a zip file including all required files')
build_parser.add_argument(
'-o',
'--output',
default='gsi_util.zip',
help='the name of output zip file (default: gsi_util.zip)')
build_parser.set_defaults(func=do_build)
args = parser.parse_args(argv[1:])
args.func(args)
if __name__ == '__main__':
main(sys.argv)

View File

@@ -1,75 +0,0 @@
#!/usr/bin/env python
#
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""gsi_util command-line utility."""
import argparse
import logging
import sys
class GsiUtil(object):
"""Object for gsi_util command line tool."""
_GSI_UTIL_VERSION = '1.0'
# Adds gsi_util COMMAND here.
# TODO(bowgotsai): auto collect from gsi_util/commands/*.py
_COMMANDS = ['flash_gsi', 'pull', 'dump', 'check_compat']
_LOGGING_FORMAT = '%(message)s'
_LOGGING_LEVEL = logging.WARNING
@staticmethod
def _get_module_name(command):
return 'gsi_util.commands.' + command
def run(self, argv):
"""Command-line processor."""
# Sets up default logging.
logging.basicConfig(format=self._LOGGING_FORMAT, level=self._LOGGING_LEVEL)
# Adds top-level --version/--debug argument.
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--version', action='version',
version='%(prog)s {}'.format(self._GSI_UTIL_VERSION))
parser.add_argument(
'-d', '--debug', help='debug mode.', action='store_true')
# Adds subparsers for each COMMAND.
subparsers = parser.add_subparsers(title='COMMAND')
for command in self._COMMANDS:
module_name = self._get_module_name(command)
mod = __import__(module_name, globals(), locals(), ['setup_command_args'])
mod.setup_command_args(subparsers)
args = parser.parse_args(argv[1:])
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
try:
args.func(args)
except Exception as e:
logging.error('%s: %s', argv[0], e.message)
if args.debug:
logging.exception(e)
sys.exit(1)
if __name__ == '__main__':
tool = GsiUtil()
tool.run(sys.argv)

View File

@@ -1,28 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A namedtuple to store a check result."""
from collections import namedtuple
CheckResultItem = namedtuple('CheckResultItem', 'title result_ok stderr')
"""The tuple to represent a check result.
Props:
title: A string to summarize the underlying check.
e.g., 'SEPolicy check', 'Binder bitness check', etc.
result_ok: True if the check passed, false otherwise.
stderr: The stderr of the underlying executed command(s).
"""

View File

@@ -1,99 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""class Checker implementation.
Two major functionalities:
1. Gets full or partial check list.
2. Runs a given check list and returns the results.
"""
from collections import namedtuple
from gsi_util.checkers import sepolicy_checker
from gsi_util.checkers import vintf_checker
CheckListItem = namedtuple('CheckListItem', 'check_item checker_class')
# Uses a tuple to prevent functions in this module from returning a mutable
# list to the caller.
_CHECK_LIST = (
CheckListItem('vintf', vintf_checker.VintfChecker),
CheckListItem('sepolicy', sepolicy_checker.SepolicyChecker),
)
class Checker(object):
"""The main checker to maintain and run a full or partial check list."""
def __init__(self, file_accessor):
"""Inits a checker with a given file_accessor.
Args:
file_accessor: Provides file access for the checker to retrieve
required files for a given check item. e.g., getting compatibility
matrix files for VINTF compatibility check, getting SEPolicy files
for SEPolicy merge test.
"""
self._file_accessor = file_accessor
def check(self, check_list):
"""Runs all check items specified in the check_list.
Args:
check_list: A list of CheckListItem() containing multiple checkers.
Returns:
A list of CheckResultItem(), by concatenating the check results of
each item in the check_list. Note that one CheckListItem() might
generate more than one CheckResultItem()s. e.g., Three check list
items in the check_list might return five check result items.
"""
check_result_items = []
for check_item in check_list:
the_checker = check_item.checker_class(self._file_accessor)
# A checker might return multiple CheckResultItem()s.
check_result_items += the_checker.check()
return check_result_items
@staticmethod
def make_check_list(check_items):
"""Returns a list of CheckListItem() by the given item names.
Args:
check_items: A list of CheckListItem().
Raises:
RuntimeError: When any input check_item is unknown or duplicated.
"""
check_list = []
for check_item in check_items:
matched_items = [x for x in _CHECK_LIST if x.check_item == check_item]
if not matched_items:
raise RuntimeError('Unknown check item: {}'.format(check_item))
# Checks there is exactly one match.
if len(matched_items) != 1:
raise RuntimeError(
'Duplicated check items: {} in the check list'.format(check_item))
check_list.append(matched_items[0]) # Appends the only matched item.
return check_list
@staticmethod
def get_all_check_list():
"""Returns the default check list, which contains full check items."""
return _CHECK_LIST

View File

@@ -1,168 +0,0 @@
# Copyright 2018 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Merges SEPolicy files from platform and non-platform.
SEPolic files have been split into two parts: platform V.S. non-platform
(a.k.a. system V.S. vendor/odm, or framework V.S. non-framework).
The former files are stored on /system partition of a device, while the
latter files are stored on /vendor and/or /odm partitions of a device.
When the device boots a GSI, /init will merge SEPolicy files from those
partitions. If the merge fails, device will reboot to fastboot mode.
We can do the same SEPolicy merge on the host side to catch this error,
prior to use a GSI.
The SEPolicy merge logic can be found under function LoadSplitPolicy() in
the following source:
https://android.googlesource.com/platform/system/core/+/master/init/selinux.cpp
"""
import logging
from gsi_util.checkers import check_result
from gsi_util.dumpers import xml_dumper
from gsi_util.utils import sepolicy_utils
# Constants for SEPolicy CIL (common intermediate language) files.
# SEPolicy files on /system.
_PLAT_SEPOLICY_CIL = '/system/etc/selinux/plat_sepolicy.cil'
# SEPolicy files on /vendor.
_VENDOR_VERSION_FILE = '/vendor/etc/selinux/plat_sepolicy_vers.txt'
_NONPLAT_SEPOLICY_CIL = '/vendor/etc/selinux/nonplat_sepolicy.cil'
# _NONPLAT_SEPOLICY_CIL has been renamed/split into the following two files.
_VENDOR_SEPOLICY_CIL = '/vendor/etc/selinux/vendor_sepolicy.cil'
_VENDOR_PLAT_PUB_SEPOLICY_CIL = '/vendor/etc/selinux/plat_pub_versioned.cil'
# SEPolicy file on /odm.
_ODM_SEPOLICY_CIL = '/odm/etc/selinux/odm_sepolicy.cil'
# System compatiblity file, required to get expected kernel sepolicy version.
_SYSTEM_COMPATIBILITY_MATRIX = '/system/compatibility_matrix.xml'
class SepolicyChecker(object): # pylint: disable=too-few-public-methods
"""Checks SEPolicy can be merged between GSI and device images."""
def __init__(self, file_accessor):
"""Inits a SEPolicy checker with a given file_accessor.
Args:
file_accessor: Provides file access to get required sepolicy files
that are installed on /system and /vendor (and/or /odm) partitions of
a device.
"""
self._file_accessor = file_accessor
def _get_vendor_mapping_version(self):
"""Gets the platform sepolicy version that vendor used.
Note that the version is introduced in project Treble and isn't related
to kernel SELinux policy version. In general, it will be aligned with
Android SDK version: 26.0, 27.0, etc. For more details, please refer to:
https://android.googlesource.com/platform/system/sepolicy/+/master/Android.mk
Returns:
A string indicating the version, e.g., '26.0'.
Raises:
RuntimeError: An error occurred when accessing required files.
"""
with self._file_accessor.prepare_file(_VENDOR_VERSION_FILE) as version:
if not version:
raise RuntimeError('Failed to open: {}'.format(_VENDOR_VERSION_FILE))
return open(version).readline().strip()
def _get_kernel_policy_version(self):
"""Gets the kernel policy version that framework expects.
The version is the SELinux policy version used by kernel. It can be
obtained via '/sys/fs/selinux/policyvers' on a running device. The
version is also included in system compatibility matrix to denote
the policy version used in system sepolicy files.
Returns:
A string indicating the kernel SELinux policy version, e.g., '30'.
"""
# XmlDumper expects the 2nd argument as a sequence.
# And it only takes the first element as the file name to open.
with xml_dumper.XmlDumper(
self._file_accessor,
(_SYSTEM_COMPATIBILITY_MATRIX,)) as dumper_instance:
return dumper_instance.dump('./sepolicy/kernel-sepolicy-version')
def check(self):
"""Merges system and vendor/odm SEPolicy files.
Returns:
A list of a single check_result.CheckResultItem() tuple.
Raises:
RuntimeError: An error occurred when accessing required files.
"""
policy_version = self._get_kernel_policy_version()
secilc_options = {'multiple-decl': None, 'mls': 'true',
'expand-generated': None, 'disable-neverallow': None,
'policyvers': policy_version, 'output': '/dev/null',
'filecontext': '/dev/null'}
cil_files = []
# Selects the mapping file based on vendor-used platform version.
vendor_plat_vers = self._get_vendor_mapping_version()
mapping_sepolicy_cil = '/system/etc/selinux/mapping/{}.cil'.format(
vendor_plat_vers)
with self._file_accessor.prepare_multi_files([
_PLAT_SEPOLICY_CIL,
mapping_sepolicy_cil,
_NONPLAT_SEPOLICY_CIL,
_VENDOR_SEPOLICY_CIL,
_VENDOR_PLAT_PUB_SEPOLICY_CIL,
_ODM_SEPOLICY_CIL]) as [plat_sepolicy, mapping_sepolicy,
nonplat_sepolicy, vendor_sepolicy,
vendor_plat_pub_sepolicy, odm_sepolicy]:
if not plat_sepolicy:
raise RuntimeError('Failed to open: {}'.format(_PLAT_SEPOLICY_CIL))
if not mapping_sepolicy:
raise RuntimeError('Failed to open: {}'.format(mapping_sepolicy_cil))
cil_files += [plat_sepolicy, mapping_sepolicy]
# nonplat_sepolicy has been split to vendor_sepolicy +
# vendor_plat_pub_sepolicy. Here we support both configs.
if nonplat_sepolicy: # For legacy devices in Oreo/Oreo-MR1.
cil_files += [nonplat_sepolicy]
logging.debug('Using nonplat sepolicy: %r', _NONPLAT_SEPOLICY_CIL)
elif vendor_sepolicy and vendor_plat_pub_sepolicy:
cil_files += [vendor_sepolicy, vendor_plat_pub_sepolicy]
logging.debug('Using vendor sepolicy: %r and %r',
_VENDOR_SEPOLICY_CIL, _VENDOR_PLAT_PUB_SEPOLICY_CIL)
else:
raise RuntimeError(
'Failed to open vendor sepolicy files.\n'
'Either {!r} or {!r}/{!r} should present'.format(
_NONPLAT_SEPOLICY_CIL, _VENDOR_SEPOLICY_CIL,
_VENDOR_PLAT_PUB_SEPOLICY_CIL))
if odm_sepolicy: # odm_sepolicy is optional.
cil_files += [odm_sepolicy]
# Executes the merge command.
result_ok, stderr = sepolicy_utils.secilc(secilc_options, cil_files)
# The caller (checker) expects to receive a list of CheckResultItems.
return [check_result.CheckResultItem('SEPolicy merge test',
result_ok,
stderr)]

View File

@@ -1,78 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Runs Treble compatibility check between /system and /vendor.
One of the major goal of project Treble is to do system-only OTA across
major Android releases (e.g., O -> P). VINTF check is to ensure a given
system.img can work well on a vendor.img, including HALs versions match,
kernel version match, SEPolicy version match, etc. See the following link
for more details:
https://source.android.com/devices/architecture/vintf/
"""
from gsi_util.checkers import check_result
from gsi_util.utils import vintf_utils
class VintfChecker(object): # pylint: disable=too-few-public-methods
"""The checker to perform VINTF check between /system and /vendor."""
# A dict to specify required VINTF checks.
# Each item is a tuple containing a (manifest, matrix) pair for the match
# check.
_REQUIRED_CHECKS = {
'Framework manifest match': ('/system/manifest.xml',
'/vendor/compatibility_matrix.xml'),
'Device manifest match': ('/vendor/manifest.xml',
'/system/compatibility_matrix.xml'),
}
def __init__(self, file_accessor):
"""Inits a VINTF checker with a given file_accessor.
Args:
file_accessor: Provides file access to get files that are installed
on /system and /vendor partition of a device.
"""
self._file_accessor = file_accessor
def check(self):
"""Performs the Treble VINTF compatibility check.
Returns:
A list of check_result.CheckResultItem() tuples.
Raises:
RuntimeError: An error occurred when accessing required files.
"""
check_result_items = []
for title in self._REQUIRED_CHECKS:
manifest_filename, matrix_filename = self._REQUIRED_CHECKS[title]
with self._file_accessor.prepare_multi_files(
[manifest_filename, matrix_filename]) as [manifest, matrix]:
if not manifest:
raise RuntimeError('Failed to open: {}'.format(manifest_filename))
if not matrix:
raise RuntimeError('Failed to open: {}'.format(matrix_filename))
# Runs the check item and appends the result.
result_ok, stderr = vintf_utils.checkvintf(manifest, matrix)
check_result_items.append(
check_result.CheckResultItem(title, result_ok, stderr))
return check_result_items

View File

@@ -1,175 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides command 'check_compat'."""
from __future__ import print_function
import argparse
import logging
from gsi_util.checkers import checker
from gsi_util.commands.common import image_sources
class CheckReporter(object):
"""Outputs the checker result with formatting."""
# The output will look like:
#
# check result 1 : pass
# check result 2 : pass
#
# ------------------------------------
# Pass/Total : 2/2
_OUTPUT_FORMAT = '{:30}: {}'
_ERR_MSE_FORMAT = ' {}'
_OUTPUT_MAX_LEN = 36
_SUMMARY_NAME = 'Pass/Total'
def __init__(self):
"""Whether to only output a summary result of all checks."""
self._only_summary = False
def set_only_summary(self):
"""Only outputs summary result.
When _only_summary is set, only shows the number of pass items over
the number of total check items.
"""
self._only_summary = True
@staticmethod
def _get_result_str(result_ok):
"""Gets the result string 'pass' or 'fail' based on the check result."""
return 'pass' if result_ok else 'fail'
def _output_result_item(self, result_item):
"""Outputs the result of a CheckResultItem().
Args:
result_item: a namedtuple of check_result.CheckResultItem().
Returns:
True if the test result passed. False otherwise.
"""
title, result_ok, stderr = result_item
if not self._only_summary:
result_str = self._get_result_str(result_ok)
print(self._OUTPUT_FORMAT.format(title, result_str))
if stderr:
print(self._ERR_MSE_FORMAT.format(stderr))
return result_ok
def _output_summary(self, num_pass_items, num_all_items):
"""Outputs a summary of all checker tests.
Args:
num_pass_items: The number of passing tests.
num_all_items: Total number of finished tests.
"""
print('-' * self._OUTPUT_MAX_LEN)
summary_result_str = '{}/{}'.format(num_pass_items, num_all_items)
print(self._OUTPUT_FORMAT.format(self._SUMMARY_NAME, summary_result_str))
def output(self, check_result_items):
"""The main public method to output a sequence of CheckResultItem()s."""
num_pass_items = 0
num_all_items = 0
for result_item in check_result_items:
result_ok = self._output_result_item(result_item)
if result_ok:
num_pass_items += 1
num_all_items += 1
self._output_summary(num_pass_items, num_all_items)
def _format_check_list(check_list):
"""Returns a string of check list item names."""
# The string is like: "'check_item1', 'check_item2', 'check_item3'".
return ', '.join('{!r}'.format(x.check_item) for x in check_list)
def do_list_checks(_):
"""Prints the all supported check items."""
print(_format_check_list(checker.Checker.get_all_check_list()))
def do_check_compat(args):
"""The actual function to do 'gsi_util check_compat' command."""
logging.info('==== CHECK_COMPAT ====')
logging.info(' system=%s vendor=%s', args.system, args.vendor)
check_list = (checker.Checker.make_check_list(args.CHECK_ITEM)
if args.CHECK_ITEM else checker.Checker.get_all_check_list())
logging.debug('Starting check list: %s', _format_check_list(check_list))
mounter = image_sources.create_composite_mounter_by_args(args)
with mounter as file_accessor:
the_checker = checker.Checker(file_accessor)
check_result = the_checker.check(check_list)
reporter = CheckReporter()
if args.only_summary:
reporter.set_only_summary()
reporter.output(check_result)
logging.info('==== DONE ====')
_CHECK_COMPAT_DESC = """
'check_compat' command checks compatibility between images.
You must assign both image sources by SYSTEM and VENDOR.
You could use command 'list_checks' to query all check items:
$ ./gsi_util.py list_checks
Here is an examples to check a system.img and a device are compatible:
$ ./gsi_util.py check_compat --system system.img --vendor adb"""
def setup_command_args(parser):
"""Sets up command 'list_checks' and 'check_compat'."""
# Command 'list_checks'.
list_check_parser = parser.add_parser(
'list_checks', help='lists all possible check items. Run')
list_check_parser.set_defaults(func=do_list_checks)
# command 'check_compat'
check_compat_parser = parser.add_parser(
'check_compat',
help='checks compatibility between a system and a vendor',
description=_CHECK_COMPAT_DESC,
formatter_class=argparse.RawTextHelpFormatter)
check_compat_parser.add_argument(
'-s',
'--only-summary',
action='store_true',
help='only output the summary result')
image_sources.add_argument_group(
check_compat_parser,
required_images=['system', 'vendor'])
check_compat_parser.add_argument(
'CHECK_ITEM',
type=str,
nargs='*',
help=('the check item to be performed\n'
'select one from: {}\n'.format(_format_check_list(
checker.Checker.get_all_check_list())) +
'if not given, it will check all'))
check_compat_parser.set_defaults(func=do_check_compat)

View File

@@ -1,67 +0,0 @@
# Copyright 2018 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provide common implementation of image sources."""
import logging
from gsi_util.mounters import composite_mounter
_DESCRIPTION = """The image sources to be mounted targets.
An image source could be:
adb[:SERIAL_NUM]: form the device which be connected with adb
image file name: from the given image file, e.g. the file name of a GSI.
If a image file is assigned to be the source of system
image, gsi_util will detect system-as-root automatically.
folder name: from the given folder, e.g. the system/vendor folder in an
Android build out folder.
"""
def create_composite_mounter_by_args(args):
"""Creates a CompositeMounter by the images in given args."""
logging.info('Mount images...')
mounter = composite_mounter.CompositeMounter()
for partition in composite_mounter.SUPPORTED_PARTITIONS:
image_source = vars(args)[partition]
if image_source:
logging.info(' %s=%s', partition, image_source)
mounter.add_by_mount_target(partition, image_source)
if mounter.is_empty():
raise RuntimeError('Must give at least one image source.')
return mounter
def add_argument_group(parser, required_images=None):
"""Add a argument group into the given parser for image sources.
Args:
parser: The parser to be added the argument group.
required_images: A list contains the required images. e.g.
['system', 'vendor']. Default is no required images.
"""
# To avoid pylint W0102
required_images = required_images or []
group = parser.add_argument_group('image sources', _DESCRIPTION)
for partition in composite_mounter.SUPPORTED_PARTITIONS:
group.add_argument(
'--' + partition,
type=str,
required=partition in required_images,
help='{} image file name, folder name or "adb"'.format(partition))

View File

@@ -1,148 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of gsi_util command 'dump'."""
import argparse
import logging
import sys
from gsi_util.commands.common import image_sources
from gsi_util.dumpers import dumper
class DumpReporter(object):
"""Format and output dump info result to a output stream.
When constructing DumpReporter, you need to give os and name_list.
os is the stream to output the formatted, which should be inherited from
io.IOBase. name_list is a string list describe the info names to be output.
After collected all dump result, calls output() to output the
dump_result_dict. dump_result_dict is a dictionary that maps info names to
theirs result values.
"""
_UNKNOWN_VALUE = '<unknown>'
def __init__(self, os, name_list):
"""Inits DumpReporter with an output stream and an info name list.
Args:
os: the output stream of outputing the report
name_list: the info name list will be output
"""
self._os = os
self._name_list = name_list
self._show_unknown = False
def set_show_unknown(self):
"""Enable force output dump info without dump result.
By default, it doesn't output the dump info in the info name list which
is not in dump result, i.e. the dump_result_dict of output().
"""
self._show_unknown = True
def _output_dump_info(self, info_name, value):
print >> self._os, '{:30}: {}'.format(info_name, value)
def output(self, dump_result_dict):
"""Output the given dump result.
Args:
dump_result_dict: the dump result dictionary to be output
"""
for info_name in self._name_list:
value = dump_result_dict.get(info_name)
if not value:
if not self._show_unknown:
continue
value = self._UNKNOWN_VALUE
self._output_dump_info(info_name, value)
def do_list_dump(_):
for info in dumper.Dumper.get_all_dump_list():
print info.info_name
def do_dump(args):
logging.info('==== DUMP ====')
logging.debug('Info name list: %s', args.INFO_NAME)
dump_list = dumper.Dumper.make_dump_list_by_name_list(args.INFO_NAME) if len(
args.INFO_NAME) else dumper.Dumper.get_all_dump_list()
mounter = image_sources.create_composite_mounter_by_args(args)
with mounter as file_accessor:
d = dumper.Dumper(file_accessor)
dump_result_dict = d.dump(dump_list)
# reserved for output to a file
os = sys.stdout
reporter = DumpReporter(os, (x.info_name for x in dump_list))
if args.show_unknown:
reporter.set_show_unknown()
reporter.output(dump_result_dict)
logging.info('==== DONE ====')
_DUMP_DESCRIPTION = ("""'dump' command dumps information from given image
You must assign at least one image source.
You could use command 'list_dump' to query all info names:
$ ./gsi_util.py list_dump
For example you could use following command to query the security patch level
in an system image file:
$ ./gsi_util.py dump --system system.img system_security_patch_level
You there is no given INFO_NAME, all information will be dumped.
Here are some other usage examples:
$ ./gsi_util.py dump --system adb --vendor adb
$ ./gsi_util.py dump --system system.img --show-unknown
$ ./gsi_util.py dump --system my/out/folder/system""")
def setup_command_args(parser):
# command 'list_dump'
list_dump_parser = parser.add_parser(
'list_dump', help='list all possible info names')
list_dump_parser.set_defaults(func=do_list_dump)
# command 'dump'
dump_parser = parser.add_parser(
'dump',
help='dump information from given image',
description=_DUMP_DESCRIPTION,
formatter_class=argparse.RawTextHelpFormatter)
dump_parser.add_argument(
'-u',
'--show-unknown',
action='store_true',
help='force display the dump info items in list which does not exist')
image_sources.add_argument_group(dump_parser)
dump_parser.add_argument(
'INFO_NAME',
type=str,
nargs='*',
help='the info name to be dumped. Dump all if not given')
dump_parser.set_defaults(func=do_dump)

View File

@@ -1,63 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of gsi_util flash_gsi command."""
from gsi_util.utils import cmd_utils
from gsi_util.utils import fastboot_utils
from gsi_util.utils import file_utils
def do_flash_gsi(args):
"""Flashes a GSI image (system.img).
Also erases userdata/metadata partition and disables
Android Verified Boot (AVB).
Args:
args: flash_gsi command arguments.
"""
fastboot_utils.erase() # erases userdata/cache partition
# Not every device has metadata partition, so allow_error is True.
fastboot_utils.erase('metadata', allow_error=True)
# Flashes GSI.
fastboot_utils.flash('system', args.image)
# Disables AVB.
with file_utils.UnopenedTemporaryFile() as vbmeta_image:
# vbmeta flag 2 means disable entire AVB verification.
cmd_utils.run_command(['avbtool', 'make_vbmeta_image',
'--flag', '2',
'--padding_size', '4096',
'--output', vbmeta_image])
# Not every device uses AVB, so allow_error is True.
fastboot_utils.flash('vbmeta', vbmeta_image, allow_error=True)
# Reboots the device.
fastboot_utils.reboot()
def setup_command_args(subparsers):
"""Sets up command args for 'flash_gsi'."""
parser = subparsers.add_parser(
'flash_gsi', help='flash a GSI image',
description=('Flash a GSI image - '
'including erasing userdata, '
'disabling AVB (if the device supports AVB) '
'and erasing metadata partition (if the device has).'))
parser.add_argument('-i', '--image',
help='the GSI image to flash', type=str)
parser.set_defaults(func=do_flash_gsi)

View File

@@ -1,77 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of gsi_util command 'pull'."""
import argparse
import logging
import shutil
import sys
from gsi_util.commands.common import image_sources
def do_pull(args):
logging.info('==== PULL ====')
source, dest = args.SOURCE, args.DEST
mounter = image_sources.create_composite_mounter_by_args(args)
with mounter as file_accessor:
with file_accessor.prepare_file(source) as filename:
if not filename:
print >> sys.stderr, 'Can not dump file: {}'.format(source)
else:
logging.debug('Copy %s -> %s', filename, dest)
shutil.copy(filename, dest)
logging.info('==== DONE ====')
_PULL_DESCRIPTION = ("""'pull' command pulls a file from the give image.
You must assign at least one image source.
SOURCE is the full path file name to pull, which must start with '/' and
includes the mount point. ex.
/system/build.prop
/vendor/compatibility_matrix.xml
Some usage examples:
$ ./gsi_util.py pull --system adb:AB0123456789 /system/manifest.xml
$ ./gsi_util.py pull --vendor adb /vendor/compatibility_matrix.xml
$ ./gsi_util.py pull --system system.img /system/build.prop
$ ./gsi_util.py pull --system my/out/folder/system /system/build.prop""")
def setup_command_args(parser):
# command 'pull'
pull_parser = parser.add_parser(
'pull',
help='pull a file from the given image',
description=_PULL_DESCRIPTION,
formatter_class=argparse.RawTextHelpFormatter)
image_sources.add_argument_group(pull_parser)
pull_parser.add_argument(
'SOURCE',
type=str,
help='the full path file name in given image to be pull')
pull_parser.add_argument(
'DEST',
nargs='?',
default='.',
type=str,
help='the file name or directory to save the pulled file (default: .)')
pull_parser.set_defaults(func=do_pull)

View File

@@ -1,51 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provide the information list for command 'dump'."""
from collections import namedtuple
from gsi_util.dumpers.prop_dumper import PropDumper
from gsi_util.dumpers.xml_dumper import XmlDumper
SYSTEM_MATRIX_DUMPER = (XmlDumper, '/system/compatibility_matrix.xml')
SYSTEM_BUILD_PROP_DUMPER = (PropDumper, '/system/build.prop')
SYSTEM_MANIFEST_DUMPER = (PropDumper, '/system/manifest.xml')
VENDOR_DEFAULT_PROP_DUMPER = (PropDumper, '/vendor/default.prop')
VENDOR_BUILD_PROP_DUMPER = (PropDumper, '/vendor/build.prop')
DumpInfoListItem = namedtuple('DumpInfoListItem',
'info_name dumper_create_args lookup_key')
# The total list of all possible dump info.
# It will be output by the order of the list.
DUMP_LIST = [
# System
DumpInfoListItem('system_fingerprint', SYSTEM_BUILD_PROP_DUMPER, 'ro.build.fingerprint'),
DumpInfoListItem('system_sdk_ver', SYSTEM_BUILD_PROP_DUMPER, 'ro.build.version.sdk'),
DumpInfoListItem('system_vndk_ver', SYSTEM_BUILD_PROP_DUMPER, 'ro.vndk.version'),
DumpInfoListItem('system_security_patch_level', SYSTEM_BUILD_PROP_DUMPER, 'ro.build.version.security_patch'),
DumpInfoListItem('system_kernel_sepolicy_ver', SYSTEM_MATRIX_DUMPER, './sepolicy/kernel-sepolicy-version'),
DumpInfoListItem('system_support_sepolicy_ver', SYSTEM_MATRIX_DUMPER, './sepolicy/sepolicy-version'),
DumpInfoListItem('system_avb_ver', SYSTEM_MATRIX_DUMPER, './avb/vbmeta-version'),
# Vendor
DumpInfoListItem('vendor_fingerprint', VENDOR_BUILD_PROP_DUMPER, 'ro.vendor.build.fingerprint'),
DumpInfoListItem('vendor_vndk_ver', VENDOR_BUILD_PROP_DUMPER, 'ro.vndk.version'),
DumpInfoListItem('vendor_security_patch_level', SYSTEM_BUILD_PROP_DUMPER, 'ro.vendor.build.version.security_patch'),
DumpInfoListItem('vendor_low_ram', VENDOR_BUILD_PROP_DUMPER, 'ro.config.low_ram'),
DumpInfoListItem('vendor_zygote', VENDOR_DEFAULT_PROP_DUMPER, 'ro.zygote'),
DumpInfoListItem('vendor_oem_unlock_supported', VENDOR_DEFAULT_PROP_DUMPER, 'oem_unlock_supported'),
DumpInfoListItem('vendor_adb_secure', VENDOR_DEFAULT_PROP_DUMPER, 'ro.adb.secure'),
] # pyformat: disable

View File

@@ -1,86 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implement dump methods and utils to dump info from a mounter."""
from gsi_util.dumpers.dump_info_list import DUMP_LIST
class Dumper(object):
def __init__(self, file_accessor):
self._file_accessor = file_accessor
@staticmethod
def _dump_by_dumper(dumper_instance, dump_list):
"""Dump info by the given dumper instance according the given dump_list.
Used for dump(), see the comment of dump() for type details.
Args:
dumper_instance: a dumper instance to process dump.
dump_list: a list of dump info to be dump. The items in the list must
relative to dumper_instance.
Returns:
The dump result by dictionary maps info_name to the value of dump result.
"""
dump_result = {}
for dump_info in dump_list:
value = dumper_instance.dump(dump_info.lookup_key)
dump_result[dump_info.info_name] = value
return dump_result
def dump(self, dump_list):
"""Dump info according the given dump_list.
Args:
dump_list: a list of dump info to be dump. See dump_info_list.py for
the detail types.
Returns:
The dump result by dictionary maps info_name to the value of dump result.
"""
dump_result = {}
# query how many different dumpers to dump
dumper_set = set([x.dumper_create_args for x in dump_list])
for dumper_create_args in dumper_set:
# The type of a dumper_create_args is (Class, instantiation args...)
dumper_class = dumper_create_args[0]
dumper_args = dumper_create_args[1:]
# Create the dumper
with dumper_class(self._file_accessor, dumper_args) as dumper_instance:
dump_list_for_the_dumper = (
x for x in dump_list if x.dumper_create_args == dumper_create_args)
dumper_result = self._dump_by_dumper(dumper_instance,
dump_list_for_the_dumper)
dump_result.update(dumper_result)
return dump_result
@staticmethod
def make_dump_list_by_name_list(name_list):
info_list = []
for info_name in name_list:
info = next((x for x in DUMP_LIST if x.info_name == info_name), None)
if not info:
raise RuntimeError('Unknown info name: "{}"'.format(info_name))
info_list.append(info)
return info_list
@staticmethod
def get_all_dump_list():
return DUMP_LIST

View File

@@ -1,44 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides class PropDumper."""
import logging
import re
class PropDumper(object):
def __init__(self, file_accessor, args):
filename_in_mount = args[0]
logging.debug('Parse %s...', filename_in_mount)
with file_accessor.prepare_file(filename_in_mount) as filename:
if filename:
with open(filename) as fp:
self._content = fp.read()
def __enter__(self):
# do nothing
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if hasattr(self, '_content'):
del self._content
def dump(self, lookup_key):
if not hasattr(self, '_content'):
return None
match = re.search('%s=(.*)' % (lookup_key), self._content)
return match.group(1) if match else None

View File

@@ -1,44 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides class XmlDumper."""
import logging
import xml.etree.ElementTree as ET
class XmlDumper(object):
def __init__(self, file_accessor, args):
filename_in_mount = args[0]
logging.debug('Parse %s...', filename_in_mount)
with file_accessor.prepare_file(filename_in_mount) as filename:
if filename:
self._tree = ET.parse(filename)
def __enter__(self):
# do nothing
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if hasattr(self, '_tree'):
del self._tree
def dump(self, lookup_key):
if not hasattr(self, '_tree'):
return None
xpath = lookup_key
results = self._tree.findall(xpath)
return ', '.join([e.text for e in results]) if results else None

View File

@@ -1,85 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides class AdbMounter.
The AdbMounter implements the abstract class BaseMounter. It can get files from
a device which is connected by adb.
"""
import errno
import logging
import os
import shutil
import tempfile
from gsi_util.mounters import base_mounter
from gsi_util.utils import adb_utils
class _AdbFileAccessor(base_mounter.BaseFileAccessor):
"""Provides file access over an adb connection."""
def __init__(self, temp_dir, serial_num):
super(_AdbFileAccessor, self).__init__()
self._temp_dir = temp_dir
self._serial_num = serial_num
@staticmethod
def _make_parent_dirs(filename):
"""Make parent directories as needed, no error if it exists."""
dir_path = os.path.dirname(filename)
try:
os.makedirs(dir_path)
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
# override
def _handle_prepare_file(self, filename_in_storage):
filename = os.path.join(self._temp_dir, filename_in_storage)
logging.info('_AdbFileAccessor: Prepare file %s -> %s',
filename_in_storage, filename)
self._make_parent_dirs(filename)
if not adb_utils.pull(filename, filename_in_storage, self._serial_num):
logging.info(' Fail to prepare file: %s', filename_in_storage)
return None
return base_mounter.MounterFile(filename)
class AdbMounter(base_mounter.BaseMounter):
"""Provides a file accessor which can access files by adb."""
def __init__(self, serial_num=None):
super(AdbMounter, self).__init__()
self._serial_num = serial_num
self._temp_dir = None
# override
def _handle_mount(self):
adb_utils.root(self._serial_num)
self._temp_dir = tempfile.mkdtemp()
logging.debug('Created temp dir: %s', self._temp_dir)
return _AdbFileAccessor(self._temp_dir, self._serial_num)
# override
def _handle_unmount(self):
if self._temp_dir:
logging.debug('Removing temp dir: %s', self._temp_dir)
shutil.rmtree(self._temp_dir)
self._temp_dir = None

View File

@@ -1,180 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base classes to implement Mounter classes."""
import abc
import logging
class MounterFile(object):
def __init__(self, filename, cleanup_func=None):
self._filename = filename
self._clean_up_func = cleanup_func
def _handle_get_filename(self):
return self._filename
def _handle_clean_up(self):
if self._clean_up_func:
self._clean_up_func()
def __enter__(self):
return self._handle_get_filename()
def __exit__(self, exc_type, exc_val, exc_tb):
self._handle_clean_up()
def get_filename(self):
return self._handle_get_filename()
def clean_up(self):
self._handle_clean_up()
class MounterFileList(object):
def __init__(self, file_list):
self._file_list = file_list
def _handle_get_filenames(self):
return [x.get_filename() for x in self._file_list]
def _handle_clean_up(self):
for x in reversed(self._file_list):
x.clean_up()
def __enter__(self):
return self._handle_get_filenames()
def __exit__(self, exc_type, exc_val, exc_tb):
self._handle_clean_up()
def get_filenames(self):
return self._handle_get_filenames()
def clean_up(self):
self._handle_clean_up()
class BaseFileAccessor(object):
"""An abstract class to implement the file accessors.
A mounter returns a file accessor when it is mounted. A file accessor must
override the method _handle_prepare_file() to return the file name of
the requested file in the storage. However, files in some mounter storages
couldn't be access directly, e.g. the file accessor of AdbMounter, which
accesses the file in a device by adb. In this case, file accessor could
return a temp file which contains the content. A file accessor could give the
cleanup_func when creating MounterFile to cleanup the temp file.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, path_prefix='/'):
logging.debug('BaseFileAccessor(path_prefix=%s)', path_prefix)
self._path_prefix = path_prefix
def _get_pathfile_to_access(self, file_to_map):
path_prefix = self._path_prefix
if not file_to_map.startswith(path_prefix):
raise RuntimeError('"%s" does not start with "%s"', file_to_map,
path_prefix)
return file_to_map[len(path_prefix):]
@abc.abstractmethod
def _handle_prepare_file(self, filename_in_storage):
"""Override this method to prepare the given file in the storage.
Args:
filename_in_storage: the file in the storage to be prepared
Returns:
Return an MounterFile instance. Return None if the request file is not
in the mount.
"""
def prepare_file(self, filename_in_mount):
"""Return the accessable file name in the storage.
The function prepares a accessable file which contains the content of the
filename_in_mount.
See BaseFileAccessor for the detail.
Args:
filename_in_mount: the file to map.
filename_in_mount should be a full path file as the path in a real
device, and must start with a '/'. For example: '/system/build.prop',
'/vendor/default.prop', '/init.rc', etc.
Returns:
A MounterFile instance. Return None if the file is not exit in the
storage.
"""
filename_in_storage = self._get_pathfile_to_access(filename_in_mount)
ret = self._handle_prepare_file(filename_in_storage)
return ret if ret else MounterFile(None)
def prepare_multi_files(self, filenames_in_mount):
file_list = [self.prepare_file(x) for x in filenames_in_mount]
return MounterFileList(file_list)
class BaseMounter(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def _handle_mount(self):
"""Override this method to handle mounting and return a file accessor.
File accessor must inherit from BaseFileAccessor.
"""
def _handle_unmount(self):
"""Override this method to handle cleanup this mounting."""
# default is do nothing
return
def _process_mount(self):
if self._mounted:
raise RuntimeError('The mounter had been mounted.')
file_accessor = self._handle_mount()
self._mounted = True
return file_accessor
def _process_unmount(self):
if self._mounted:
self._handle_unmount()
self._mounted = False
def __init__(self):
self._mounted = False
def __enter__(self):
return self._process_mount()
def __exit__(self, exc_type, exc_val, exc_tb):
self._process_unmount()
def mount(self):
return self._process_mount()
def unmount(self):
self._process_unmount()

View File

@@ -1,128 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides class CompositeMounter.
CompositeMounter implements the abstract class BaseMounter. It can add multiple
mounters inside as sub-mounters, and operate these sub-mounters with the
BaseMounter interface. Uses CompositeMounter.add_sub_mounter() to add
sub-mounter.
Usually, using CompositeMounter.add_by_mount_target() to add mounters is easier,
the method uses class _MounterFactory to create a mounter and then adds it.
class _MounterFactory provides a method to create a mounter by 'mounter_target'.
'mounter_target' is a name which identify what is the file source to be
mounted. See _MounterFactory.create_by_mount_target() for the detail.
"""
import logging
import os
from gsi_util.mounters import adb_mounter
from gsi_util.mounters import base_mounter
from gsi_util.mounters import folder_mounter
from gsi_util.mounters import image_mounter
SUPPORTED_PARTITIONS = ['system', 'vendor', 'odm']
class _MounterFactory(object):
@classmethod
def create_by_mount_target(cls, mount_target, partition):
"""Create a proper Mounter instance by a string of mount target.
Args:
partition: the partition to be mounted as
mount_target: 'adb', a folder name or an image file name to mount.
see Returns for the detail.
Returns:
Returns an AdbMounter if mount_target is 'adb[:SERIAL_NUM]'
Returns a FolderMounter if mount_target is a folder name
Returns an ImageMounter if mount_target is an image file name
Raises:
ValueError: partiton is not support or mount_target is not exist.
"""
if partition not in SUPPORTED_PARTITIONS:
raise ValueError('Wrong partition name "{}"'.format(partition))
if mount_target == 'adb' or mount_target.startswith('adb:'):
(_, _, serial_num) = mount_target.partition(':')
return adb_mounter.AdbMounter(serial_num)
path_prefix = '/{}/'.format(partition)
if os.path.isdir(mount_target):
return folder_mounter.FolderMounter(mount_target, path_prefix)
if os.path.isfile(mount_target):
if partition == 'system':
path_prefix = image_mounter.ImageMounter.DETECT_SYSTEM_AS_ROOT
return image_mounter.ImageMounter(mount_target, path_prefix)
raise ValueError('Unknown target "{}"'.format(mount_target))
class _CompositeFileAccessor(base_mounter.BaseFileAccessor):
def __init__(self, file_accessors):
super(_CompositeFileAccessor, self).__init__()
self._file_accessors = file_accessors
# override
def _handle_prepare_file(self, filename_in_storage):
logging.debug('_CompositeFileAccessor._handle_prepare_file(%s)',
filename_in_storage)
pathfile_to_prepare = '/' + filename_in_storage
for (prefix_path, file_accessor) in self._file_accessors:
if pathfile_to_prepare.startswith(prefix_path):
return file_accessor.prepare_file(pathfile_to_prepare)
logging.debug(' Not found')
return None
class CompositeMounter(base_mounter.BaseMounter):
"""Implements a BaseMounter which can add multiple sub-mounters."""
def __init__(self):
super(CompositeMounter, self).__init__()
self._mounters = []
def is_empty(self):
return not self._mounters
# override
def _handle_mount(self):
file_accessors = [(path_prefix, mounter.mount())
for (path_prefix, mounter) in self._mounters]
return _CompositeFileAccessor(file_accessors)
# override
def _handle_unmount(self):
for (_, mounter) in reversed(self._mounters):
mounter.unmount()
def add_sub_mounter(self, mount_point, mounter):
self._mounters.append((mount_point, mounter))
def add_by_mount_target(self, partition, mount_target):
logging.debug('CompositeMounter.add_by_mount_target(%s, %s)',
partition, mount_target)
mount_point = '/{}/'.format(partition)
mounter = _MounterFactory.create_by_mount_target(mount_target, partition)
self.add_sub_mounter(mount_point, mounter)

View File

@@ -1,55 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides class FolderMounter.
The FolderMounter implements the abstract class BaseMounter. It can
get files from a given folder. The folder is usually the system/vendor folder
of $OUT folder in an Android build environment.
"""
import logging
import os
from gsi_util.mounters import base_mounter
class _FolderFileAccessor(base_mounter.BaseFileAccessor):
def __init__(self, folder_dir, path_prefix):
super(_FolderFileAccessor, self).__init__(path_prefix)
self._folder_dir = folder_dir
# override
def _handle_prepare_file(self, filename_in_storage):
filename = os.path.join(self._folder_dir, filename_in_storage)
logging.info('_FolderFileAccessor: Prepare file %s -> %s',
filename_in_storage, filename)
if not os.path.isfile(filename):
logging.info(' File is not exist: %s', filename_in_storage)
return None
return base_mounter.MounterFile(filename)
class FolderMounter(base_mounter.BaseMounter):
"""Provides a file accessor which can access files in the given folder."""
def __init__(self, folder_dir, path_prefix):
super(FolderMounter, self).__init__()
self._folder_dir = folder_dir
self._path_prefix = path_prefix
# override
def _handle_mount(self):
return _FolderFileAccessor(self._folder_dir, self._path_prefix)

View File

@@ -1,135 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides class ImageMounter.
The ImageMounter implements the abstract class BaseMounter,
It can get files from an image file. e.g., system.img or vendor.img.
"""
import errno
import logging
import os
import shutil
import tempfile
from gsi_util.mounters import base_mounter
from gsi_util.utils import debugfs
from gsi_util.utils import image_utils
class _ImageFileAccessor(base_mounter.BaseFileAccessor):
@staticmethod
def _make_parent_dirs(filename):
"""Make parent directories as needed, no error if it exists."""
dir_path = os.path.dirname(filename)
try:
os.makedirs(dir_path)
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
def __init__(self, path_prefix, raw_image_file, temp_dir):
super(_ImageFileAccessor, self).__init__(path_prefix)
self._raw_image_file = raw_image_file
self._temp_dir = temp_dir
# override
def _handle_prepare_file(self, filename_in_storage):
filespec = os.path.join('/', filename_in_storage)
out_file = os.path.join(self._temp_dir, filename_in_storage)
logging.info('_ImageFileAccessor: Prepare file %s -> %s',
filename_in_storage, out_file)
self._make_parent_dirs(out_file)
if not debugfs.dump(self._raw_image_file, filespec, out_file):
logging.info(' File does not exist: %s', filename_in_storage)
return None
return base_mounter.MounterFile(out_file)
class ImageMounter(base_mounter.BaseMounter):
"""Provides a file accessor which can access files in the given image file."""
DETECT_SYSTEM_AS_ROOT = 'detect-system-as-root'
_SYSTEM_FILES = ['compatibility_matrix.xml', 'build.prop']
def __init__(self, image_filename, path_prefix):
super(ImageMounter, self).__init__()
self._image_filename = image_filename
self._path_prefix = path_prefix
@classmethod
def _detect_system_as_root(cls, raw_image_file):
"""Returns True if the image layout of raw_image_file is system-as-root."""
logging.debug('Checking system-as-root in %s...', raw_image_file)
system_without_root = True
for filename in cls._SYSTEM_FILES:
file_spec = os.path.join('/', filename)
if debugfs.get_type(raw_image_file, file_spec) != 'regular':
system_without_root = False
break
system_as_root = True
for filename in cls._SYSTEM_FILES:
file_spec = os.path.join('/system', filename)
if debugfs.get_type(raw_image_file, file_spec) != 'regular':
system_as_root = False
break
ret = system_as_root and not system_without_root
logging.debug(
'Checked system-as-root=%s system_without_root=%s result=%s',
system_as_root,
system_without_root,
ret)
return ret
# override
def _handle_mount(self):
# Unsparse the image to a temp file
unsparsed_suffix = '_system.img.raw'
unsparsed_file = tempfile.NamedTemporaryFile(suffix=unsparsed_suffix)
unsparsed_filename = unsparsed_file.name
image_utils.unsparse(unsparsed_filename, self._image_filename)
# detect system-as-root if need
path_prefix = self._path_prefix
if path_prefix == self.DETECT_SYSTEM_AS_ROOT:
path_prefix = '/' if self._detect_system_as_root(
unsparsed_filename) else '/system/'
# Create a temp dir for the target of copying file from image
temp_dir = tempfile.mkdtemp()
logging.debug('Created temp dir: %s', temp_dir)
# Keep data to be removed on __exit__
self._unsparsed_file = unsparsed_file
self._temp_dir = tempfile.mkdtemp()
return _ImageFileAccessor(path_prefix, unsparsed_filename, temp_dir)
# override
def _handle_unmount(self):
if hasattr(self, '_temp_dir'):
logging.debug('Removing temp dir: %s', self._temp_dir)
shutil.rmtree(self._temp_dir)
del self._temp_dir
if hasattr(self, '_unsparsed_file'):
# will also delete the temp file implicitly
del self._unsparsed_file

View File

@@ -1,44 +0,0 @@
#!/usr/bin/env python
#
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""ADB-related utilities."""
import logging
import subprocess
from gsi_util.utils.cmd_utils import run_command
def root(serial_num=None):
command = ['adb']
if serial_num:
command += ['-s', serial_num]
command += ['root']
# 'read_stdout=True' to disable output
run_command(command, raise_on_error=False, read_stdout=True, log_stderr=True)
def pull(local_filename, remote_filename, serial_num=None):
command = ['adb']
if serial_num:
command += ['-s', serial_num]
command += ['pull', remote_filename, local_filename]
# 'read_stdout=True' to disable output
(returncode, _, _) = run_command(
command, raise_on_error=False, read_stdout=True)
return returncode == 0

View File

@@ -1,113 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Command-related utilities."""
from collections import namedtuple
import logging
import os
import shlex
import subprocess
import sys
CommandResult = namedtuple('CommandResult', 'returncode stdoutdata, stderrdata')
PIPE = subprocess.PIPE
_LOCAL_BIN_PATH = os.path.join(os.path.abspath(os.path.dirname(sys.argv[0])),
'bin')
def _update_command_for_local(command, kwargs):
if kwargs.get('shell', False):
# do nothing for shell commands
return
prog = command[0]
local_prog = os.path.join(_LOCAL_BIN_PATH, prog)
if os.path.isfile(local_prog) and os.access(local_prog, os.X_OK):
logging.debug('Using local executable: %s', local_prog)
command[0] = local_prog
def run_command(command, read_stdout=False, read_stderr=False,
log_stdout=False, log_stderr=False,
raise_on_error=True, sudo=False, **kwargs):
"""Runs a command and returns the results.
The method tries to use the executable in bin/ firstly.
Args:
command: A sequence of command arguments or else a single string.
read_stdout: If True, includes stdout data in the returned tuple.
Otherwise includes None in the returned tuple.
read_stderr: If True, includes stderr data in the returned tuple.
Otherwise includes None in the returned tuple.
log_stdout: If True, logs stdout data.
log_stderr: If True, logs stderro data.
raise_on_error: If True, raise exception if return code is nonzero.
sudo: Prepends 'sudo' to command if user is not root.
**kwargs: the keyword arguments passed to subprocess.Popen().
Returns:
A namedtuple CommandResult(returncode, stdoutdata, stderrdata).
The latter two fields will be set only when read_stdout/read_stderr
is True, respectively. Otherwise, they will be None.
Raises:
OSError: Not such a command to execute, raised by subprocess.Popen().
subprocess.CalledProcessError: The return code of the command is nonzero.
"""
_update_command_for_local(command, kwargs)
if sudo and os.getuid() != 0:
if kwargs.pop('shell', False):
command = ['sudo', 'sh', '-c', command]
else:
command = ['sudo'] + command
if read_stdout or log_stdout:
assert kwargs.get('stdout') in [None, PIPE]
kwargs['stdout'] = PIPE
if read_stderr or log_stderr:
assert kwargs.get('stderr') in [None, PIPE]
kwargs['stderr'] = PIPE
need_communicate = (read_stdout or read_stderr or
log_stdout or log_stderr)
proc = subprocess.Popen(command, **kwargs)
if need_communicate:
stdout, stderr = proc.communicate()
else:
proc.wait() # no need to communicate; just wait.
if kwargs.get('shell'):
command_in_log = command
else:
command_in_log = ' '.join(arg for arg in command)
logging.log(logging.INFO, 'Executed command: %r (ret: %d)',
command_in_log, proc.returncode)
log_level = logging.ERROR if proc.returncode != 0 else logging.INFO
if log_stdout:
logging.log(log_level, ' stdout: %r', stdout)
if log_stderr:
logging.log(log_level, ' stderr: %r', stderr)
if proc.returncode != 0 and raise_on_error:
raise subprocess.CalledProcessError(proc.returncode, command)
return CommandResult(proc.returncode,
stdout if read_stdout else None,
stderr if read_stderr else None)

View File

@@ -1,75 +0,0 @@
#!/usr/bin/env python
#
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""debugfs-related utilities."""
import logging
import os
import re
from gsi_util.utils.cmd_utils import run_command
_DEBUGFS = 'debugfs'
def dump(image_file, file_spec, out_file):
"""Dumps the content of the file file_spec to the output file out_file.
Args:
image_file: The image file to be query.
file_spec: The full file/directory in the image_file to be copied.
out_file: The output file name in the local directory.
Returns:
True if 'debugfs' command success. False otherwise.
"""
debugfs_command = 'dump {} {}'.format(file_spec, out_file)
run_command([_DEBUGFS, '-R', debugfs_command, image_file], log_stderr=True)
if not os.path.isfile(out_file):
logging.debug('debugfs failed to dump the file %s', file_spec)
return False
return True
def get_type(image_file, file_spec):
"""Gets the type of the given file_spec.
Args:
image_file: The image file to be query.
file_spec: The full file/directory in the image_file to be query.
Returns:
None if file_spec does not exist.
'regular' if file_spec is a file.
'directory' if file_spec is a directory.
"""
debugfs_command = 'stat {}'.format(file_spec)
_, output, error = run_command(
[_DEBUGFS, '-R', debugfs_command, image_file],
read_stdout=True,
read_stderr=True,
log_stderr=True)
if re.search('File not found', error):
logging.debug('get_type() returns None')
return None
# Search the "type:" field in the output, it should be 'regular' (for a file)
# or 'directory'
m = re.search('Type:\\s*([^\\s]+)', output)
assert m is not None, '{} outputs with an unknown format.'.format(_DEBUGFS)
ret = m.group(1)
logging.debug('get_type() returns \'%s\'', ret)
return ret

View File

@@ -1,45 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Fastboot-related commands."""
from gsi_util.utils.cmd_utils import run_command
def flash(partition_name, image_name=None, allow_error=False):
"""fastboot flash a partition with a given image."""
cmd = ['fastboot', 'flash', partition_name]
# image_name can be None, for `fastboot` to flash
# ${ANDROID_PRODUCT_OUT}/{partition_name}.img.
if image_name is not None:
cmd.append(image_name)
run_command(cmd, raise_on_error=not allow_error)
def erase(partition_name=None, allow_error=False):
"""fastboot erase a partition."""
if partition_name is None:
run_command(['fastboot', '-w'], raise_on_error=not allow_error)
else:
run_command(['fastboot', 'erase', partition_name],
raise_on_error=not allow_error)
def reboot():
"""fastboot reboot a device."""
run_command(['fastboot', 'reboot'], raise_on_error=False)

View File

@@ -1,44 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""File-related utilities."""
import contextlib
import os
import tempfile
@contextlib.contextmanager
def UnopenedTemporaryFile(**kwargs):
"""Creates and returns a unopened temprary file path.
This function is similar to tempfile.TemporaryFile, except that an
unopened file path is returend instead of a file-like object.
The file will be deleted when the context manager is closed.
Args:
**kwargs: Any keyward arguments passed to tempfile.mkstemp (e.g., dir,
prefix and suffix).
Returns:
An unopened temporary file path.
"""
fd, path = tempfile.mkstemp(**kwargs)
os.close(fd)
try:
yield path
finally:
if os.path.exists(path):
os.unlink(path)

View File

@@ -1,23 +0,0 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Image-related utilities."""
import logging
from gsi_util.utils.cmd_utils import run_command
def unsparse(output_filename, input_filename):
logging.debug('Unsparsing %s...', input_filename)
run_command(['simg2img', input_filename, output_filename])

View File

@@ -1,83 +0,0 @@
# Copyright 2018 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SEPolicy-related commands."""
from gsi_util.utils import cmd_utils
def secilc(options, files):
"""Invokes SELinux Common Intermediate Language (CIL) Compiler.
Args:
options: A dict of the options passed to 'secilc'.
e.g., dict(mls='true', multiple-decls=None, policyvers=30) ==>
'--mls true --multiple-decls --policyvers 30'.
e.g., dict(M='true', m=None, c=30) ==> '-M true -m -c 30'.
files: CIL files passed to 'secilc'.
Returns:
A tuple of (result_ok, stderr).
$ secilc --help
Usage: secilc [OPTION]... FILE...
Options:
-o, --output=<file> write binary policy to <file>
(default: policy.<version>)
-f, --filecontext=<file> write file contexts to <file>
(default: file_contexts)
-t, --target=<type> specify target architecture. may be selinux or
xen. (default: selinux)
-M, --mls true|false build an mls policy. Must be true or false.
This will override the (mls boolean) statement
if present in the policy
-c, --policyvers=<version> build a binary policy with a given <version>
(default: 31)
-U, --handle-unknown=<action> how to handle unknown classes or permissions.
may be deny, allow, or reject. (default: deny)
This will override the (handleunknown action)
statement if present in the policy
-D, --disable-dontaudit do not add dontaudit rules to the binary policy
-P, --preserve-tunables treat tunables as booleans
-m, --multiple-decls allow some statements to be re-declared
-N, --disable-neverallow do not check neverallow rules
-G, --expand-generated Expand and remove auto-generated attributes
-X, --expand-size <SIZE> Expand type attributes with fewer than <SIZE>
members.
-v, --verbose increment verbosity level
-h, --help display usage information
"""
cmd = ['secilc']
for option in options:
# For short options. e.g., '-m', '-c 30'.
if len(option) == 1:
cmd.append('-' + option)
else: # For long options. e.g., '--multiple-decls', '--policyvers 30'.
cmd.append('--' + option)
# Some option doesn't need value. e.g., -m, -G.
if options[option] is not None:
cmd.append(options[option])
# Adding CIL files.
cmd.extend(files)
# Uses 'log_stdout' and 'log_stderr' to disable output.
returncode, _, stderrdata = cmd_utils.run_command(cmd,
raise_on_error=False,
log_stdout=True,
log_stderr=True,
read_stderr=True)
return (returncode == 0, stderrdata)

View File

@@ -1,135 +0,0 @@
#!/usr/bin/env python
#
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit test for gsi_util.utils.cmd_utils."""
import logging
from logging import handlers
import shutil
import subprocess
import tempfile
import unittest
from gsi_util.utils import cmd_utils
class RunCommandTest(unittest.TestCase):
def setUp(self):
"""Sets up logging output for assert checks."""
log_entries = self._log_entries = []
class Target(object):
"""The target handler to store log output."""
def handle(self, record):
log_entries.append((record.levelname, record.msg % record.args))
self._handler = handlers.MemoryHandler(capacity=0, target=Target())
logging.getLogger().addHandler(self._handler)
def tearDown(self):
"""Removes logging handler."""
logging.getLogger().removeHandler(self._handler)
def test_command_sequence(self):
result = cmd_utils.run_command(['echo', '123'])
self.assertEqual((0, None, None), result)
self.assertEqual(('INFO', "Executed command: 'echo 123' (ret: 0)"),
self._log_entries[0])
def test_shell_command(self):
result = cmd_utils.run_command('echo uses shell', shell=True)
self.assertEqual((0, None, None), result)
self.assertEqual(('INFO', "Executed command: 'echo uses shell' (ret: 0)"),
self._log_entries[0])
def test_log_stdout(self):
result = cmd_utils.run_command(['echo', '456'], raise_on_error=False,
log_stdout=True)
self.assertEqual((0, None, None), result)
self.assertEqual(('INFO', "Executed command: 'echo 456' (ret: 0)"),
self._log_entries[0])
self.assertEqual(('INFO', " stdout: '456\\n'"),
self._log_entries[1])
def test_log_stderr(self):
error_cmd = 'echo foo; echo bar; (echo 123; echo 456;)>&2; exit 3'
result = cmd_utils.run_command(error_cmd, shell=True, raise_on_error=False,
log_stderr=True)
self.assertEqual((3, None, None), result)
self.assertEqual(
('INFO', 'Executed command: %r (ret: %d)' % (error_cmd, 3)),
self._log_entries[0])
self.assertEqual(('ERROR', " stderr: '123\\n456\\n'"),
self._log_entries[1])
def test_log_stdout_and_log_stderr(self):
error_cmd = 'echo foo; echo bar; (echo 123; echo 456;)>&2; exit 5'
result = cmd_utils.run_command(error_cmd, shell=True, raise_on_error=False,
log_stdout=True, log_stderr=True)
self.assertEqual((5, None, None), result)
self.assertEqual(
('INFO', 'Executed command: %r (ret: %d)' % (error_cmd, 5)),
self._log_entries[0])
self.assertEqual(('ERROR', " stdout: 'foo\\nbar\\n'"),
self._log_entries[1])
self.assertEqual(('ERROR', " stderr: '123\\n456\\n'"),
self._log_entries[2])
def test_read_stdout(self):
result = cmd_utils.run_command('echo 123; echo 456; exit 7', shell=True,
read_stdout=True, raise_on_error=False)
self.assertEqual(7, result.returncode)
self.assertEqual('123\n456\n', result.stdoutdata)
self.assertEqual(None, result.stderrdata)
def test_read_stderr(self):
result = cmd_utils.run_command('(echo error1; echo error2)>&2; exit 9',
shell=True, read_stderr=True,
raise_on_error=False)
self.assertEqual(9, result.returncode)
self.assertEqual(None, result.stdoutdata)
self.assertEqual('error1\nerror2\n', result.stderrdata)
def test_read_stdout_and_stderr(self):
result = cmd_utils.run_command('echo foo; echo bar>&2; exit 11',
shell=True, read_stdout=True,
read_stderr=True, raise_on_error=False)
self.assertEqual(11, result.returncode)
self.assertEqual('foo\n', result.stdoutdata)
self.assertEqual('bar\n', result.stderrdata)
def test_raise_on_error(self):
error_cmd = 'echo foo; exit 13'
with self.assertRaises(subprocess.CalledProcessError) as context_manager:
cmd_utils.run_command(error_cmd, shell=True, raise_on_error=True)
proc_err = context_manager.exception
self.assertEqual(13, proc_err.returncode)
self.assertEqual(error_cmd, proc_err.cmd)
def test_change_working_directory(self):
"""Tests that cwd argument can be passed to subprocess.Popen()."""
tmp_dir = tempfile.mkdtemp(prefix='cmd_utils_test')
result = cmd_utils.run_command('pwd', shell=True,
read_stdout=True, raise_on_error=False,
cwd=tmp_dir)
self.assertEqual('%s\n' % tmp_dir, result.stdoutdata)
shutil.rmtree(tmp_dir)
if __name__ == '__main__':
logging.basicConfig(format='%(message)s', level=logging.INFO)
unittest.main()

View File

@@ -1,57 +0,0 @@
#!/usr/bin/env python
#
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit test for gsi_util.utils.file_utils."""
import os
import tempfile
import unittest
from gsi_util.utils import file_utils
class UnopenedTemporaryFileTest(unittest.TestCase):
"""Unit test for UnopenedTemporaryFile."""
def test_normal(self):
with file_utils.UnopenedTemporaryFile(
prefix='prefix', suffix='suffix') as f:
self.assertTrue(os.path.exists(f))
self.assertEqual(0, os.path.getsize(f))
self.assertRegexpMatches(os.path.basename(f), r'prefix.+suffix')
self.assertEqual(tempfile.gettempdir(), os.path.dirname(f))
self.assertFalse(os.path.exists(f))
def test_remove_before_exit(self):
with file_utils.UnopenedTemporaryFile() as f:
self.assertTrue(os.path.exists(f))
os.unlink(f)
self.assertFalse(os.path.exists(f))
self.assertFalse(os.path.exists(f))
def test_rename_before_exit(self):
with file_utils.UnopenedTemporaryFile() as f:
self.assertTrue(os.path.exists(f))
new_name = f + '.new'
os.rename(f, new_name)
self.assertFalse(os.path.exists(f))
self.assertFalse(os.path.exists(f))
self.assertTrue(os.path.exists(new_name))
os.unlink(new_name)
if __name__ == '__main__':
unittest.main()

View File

@@ -1,43 +0,0 @@
#!/usr/bin/env python
#
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""VINTF-related utilities."""
import logging
from gsi_util.utils import cmd_utils
def checkvintf(manifest, matrix):
"""Invokes host command 'checkvintf' to do VINTF check.
Usage of the command: checkvintf <manifest.xml> <matrix.xml>.
Args:
manifest: the manifest XML file.
matrix: the matrix XML file.
Returns:
A tuple of (result_ok, stderr).
"""
logging.debug('checkvintf %s %s...', manifest, matrix)
# Uses 'read_stdout' and 'read_stderr' to disable output.
returncode, _, stderrdata = cmd_utils.run_command(
['checkvintf', manifest, matrix],
raise_on_error=False,
read_stdout=True,
read_stderr=True)
return (returncode == 0, stderrdata)

View File

@@ -1,45 +0,0 @@
#!/usr/bin/env python
#
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Script to run */tests/*_unittest.py files."""
from multiprocessing import Process
import os
import runpy
def get_unittest_files():
matches = []
for dirpath, _, filenames in os.walk('.'):
if os.path.basename(dirpath) == 'tests':
matches.extend(os.path.join(dirpath, f)
for f in filenames if f.endswith('_unittest.py'))
return matches
def run_test(unittest_file):
runpy.run_path(unittest_file, run_name='__main__')
if __name__ == '__main__':
for path in get_unittest_files():
# Forks a process to run the unittest.
# Otherwise, it only runs one unittest.
p = Process(target=run_test, args=(path,))
p.start()
p.join()
if p.exitcode != 0:
break # stops on any failure unittest