gsi_util: adding cmd_utils

Adding a run_command function to execute a host command.
Also support reading the stdout and/or stderr of the result if desired,
which might be useful for further parsing and flow decision.

A typical usage will be passing a command sequence:
  cmd_utils.run_command(['echo', '123'])

It also supports running a 'shell' command, by passing a single string:
  cmd_utils.run_command('echo 123', shell=True)

To get the stdout and/or stderr data, just add 'read_stdout=True' and/or
'read_stderr=True':
  cmd_utils.run_command('echo 123', shell=True,
                        read_stdout=True, read_stderr=True),

which returns a namedtuple:
  ('CommandResult', 'returncode stdoutdata, stderrdata').

Note that other keyword arguments will be passed to subprocess.Popen().
e.g., the following command will change current directory to
'my_working_dir' prior to execute the command:
  cmd_utils.run_command('echo 123', shell=True, cwd=my_working_dir)

More usage examples can be found in cmd_utils_unittest.py.

Also adds a run_test.py to search then run ./*tests/*_unittest.py files.

Bug: 70477387
Test: make gsi_util
Test: ./run_test.py
Change-Id: Id3aae935f941818fe7415798937fd07dbbe6ba33
This commit is contained in:
Bowgo Tsai
2017-12-11 11:11:08 +08:00
parent 73fec9375d
commit a1f4058845
5 changed files with 273 additions and 0 deletions

View File

@@ -18,6 +18,7 @@ python_binary_host {
"gsi_util.py", "gsi_util.py",
"gsi_util/*.py", "gsi_util/*.py",
"gsi_util/commands/*.py", "gsi_util/commands/*.py",
"gsi_util/utils/*.py",
], ],
version: { version: {
py2: { py2: {

View File

View File

@@ -0,0 +1,92 @@
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Command-related utilities."""
from collections import namedtuple
import logging
import os
import subprocess
CommandResult = namedtuple('CommandResult', 'returncode stdoutdata, stderrdata')
PIPE = subprocess.PIPE
def run_command(command, read_stdout=False, read_stderr=False,
log_stdout=False, log_stderr=False,
raise_on_error=True, sudo=False, **kwargs):
"""Runs a command and returns the results.
Args:
command: A sequence of command arguments or else a single string.
read_stdout: If True, includes stdout data in the returned tuple.
Otherwise includes None in the returned tuple.
read_stderr: If True, includes stderr data in the returned tuple.
Otherwise includes None in the returned tuple.
log_stdout: If True, logs stdout data.
log_stderr: If True, logs stderro data.
raise_on_error: If True, raise exception if return code is nonzero.
sudo: Prepends 'sudo' to command if user is not root.
**kwargs: the keyword arguments passed to subprocess.Popen().
Returns:
A namedtuple CommandResult(returncode, stdoutdata, stderrdata).
The latter two fields will be set only when read_stdout/read_stderr
is True, respectively. Otherwise, they will be None.
Raises:
OSError: Not such a command to execute, raised by subprocess.Popen().
subprocess.CalledProcessError: The return code of the command is nonzero.
"""
if sudo and os.getuid() != 0:
if kwargs.pop('shell', False):
command = ['sudo', 'sh', '-c', command]
else:
command = ['sudo'] + command
if kwargs.get('shell'):
command_in_log = command
else:
command_in_log = ' '.join(arg for arg in command)
if read_stdout or log_stdout:
assert kwargs.get('stdout') in [None, PIPE]
kwargs['stdout'] = PIPE
if read_stderr or log_stderr:
assert kwargs.get('stderr') in [None, PIPE]
kwargs['stderr'] = PIPE
need_communicate = (read_stdout or read_stderr or
log_stdout or log_stderr)
proc = subprocess.Popen(command, **kwargs)
if need_communicate:
stdout, stderr = proc.communicate()
else:
proc.wait() # no need to communicate; just wait.
log_level = logging.ERROR if proc.returncode != 0 else logging.INFO
logging.log(log_level, 'Executed command: %r (ret: %d)',
command_in_log, proc.returncode)
if log_stdout:
logging.log(log_level, ' stdout: %r', stdout)
if log_stderr:
logging.log(log_level, ' stderr: %r', stderr)
if proc.returncode != 0 and raise_on_error:
raise subprocess.CalledProcessError(proc.returncode, command)
return CommandResult(proc.returncode,
stdout if read_stdout else None,
stderr if read_stderr else None)

View File

@@ -0,0 +1,135 @@
#!/usr/bin/env python
#
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit test for gsi_util.utils.cmd_utils."""
import logging
from logging import handlers
import shutil
import subprocess
import tempfile
import unittest
from gsi_util.utils import cmd_utils
class RunCommandTest(unittest.TestCase):
def setUp(self):
"""Sets up logging output for assert checks."""
log_entries = self._log_entries = []
class Target(object):
"""The target handler to store log output."""
def handle(self, record):
log_entries.append((record.levelname, record.msg % record.args))
self._handler = handlers.MemoryHandler(capacity=0, target=Target())
logging.getLogger().addHandler(self._handler)
def tearDown(self):
"""Removes logging handler."""
logging.getLogger().removeHandler(self._handler)
def test_command_sequence(self):
result = cmd_utils.run_command(['echo', '123'])
self.assertEqual((0, None, None), result)
self.assertEqual(('INFO', "Executed command: 'echo 123' (ret: 0)"),
self._log_entries[0])
def test_shell_command(self):
result = cmd_utils.run_command('echo uses shell', shell=True)
self.assertEqual((0, None, None), result)
self.assertEqual(('INFO', "Executed command: 'echo uses shell' (ret: 0)"),
self._log_entries[0])
def test_log_stdout(self):
result = cmd_utils.run_command(['echo', '456'], raise_on_error=False,
log_stdout=True)
self.assertEqual((0, None, None), result)
self.assertEqual(('INFO', "Executed command: 'echo 456' (ret: 0)"),
self._log_entries[0])
self.assertEqual(('INFO', " stdout: '456\\n'"),
self._log_entries[1])
def test_log_stderr(self):
error_cmd = 'echo foo; echo bar; (echo 123; echo 456;)>&2; exit 3'
result = cmd_utils.run_command(error_cmd, shell=True, raise_on_error=False,
log_stderr=True)
self.assertEqual((3, None, None), result)
self.assertEqual(
('ERROR', 'Executed command: %r (ret: %d)' % (error_cmd, 3)),
self._log_entries[0])
self.assertEqual(('ERROR', " stderr: '123\\n456\\n'"),
self._log_entries[1])
def test_log_stdout_and_log_stderr(self):
error_cmd = 'echo foo; echo bar; (echo 123; echo 456;)>&2; exit 5'
result = cmd_utils.run_command(error_cmd, shell=True, raise_on_error=False,
log_stdout=True, log_stderr=True)
self.assertEqual((5, None, None), result)
self.assertEqual(
('ERROR', 'Executed command: %r (ret: %d)' % (error_cmd, 5)),
self._log_entries[0])
self.assertEqual(('ERROR', " stdout: 'foo\\nbar\\n'"),
self._log_entries[1])
self.assertEqual(('ERROR', " stderr: '123\\n456\\n'"),
self._log_entries[2])
def test_read_stdout(self):
result = cmd_utils.run_command('echo 123; echo 456; exit 7', shell=True,
read_stdout=True, raise_on_error=False)
self.assertEqual(7, result.returncode)
self.assertEqual('123\n456\n', result.stdoutdata)
self.assertEqual(None, result.stderrdata)
def test_read_stderr(self):
result = cmd_utils.run_command('(echo error1; echo error2)>&2; exit 9',
shell=True, read_stderr=True,
raise_on_error=False)
self.assertEqual(9, result.returncode)
self.assertEqual(None, result.stdoutdata)
self.assertEqual('error1\nerror2\n', result.stderrdata)
def test_read_stdout_and_stderr(self):
result = cmd_utils.run_command('echo foo; echo bar>&2; exit 11',
shell=True, read_stdout=True,
read_stderr=True, raise_on_error=False)
self.assertEqual(11, result.returncode)
self.assertEqual('foo\n', result.stdoutdata)
self.assertEqual('bar\n', result.stderrdata)
def test_raise_on_error(self):
error_cmd = 'echo foo; exit 13'
with self.assertRaises(subprocess.CalledProcessError) as context_manager:
cmd_utils.run_command(error_cmd, shell=True, raise_on_error=True)
proc_err = context_manager.exception
self.assertEqual(13, proc_err.returncode)
self.assertEqual(error_cmd, proc_err.cmd)
def test_change_working_directory(self):
"""Tests that cwd argument can be passed to subprocess.Popen()."""
tmp_dir = tempfile.mkdtemp(prefix='cmd_utils_test')
result = cmd_utils.run_command('pwd', shell=True,
read_stdout=True, raise_on_error=False,
cwd=tmp_dir)
self.assertEqual('%s\n' % tmp_dir, result.stdoutdata)
shutil.rmtree(tmp_dir)
if __name__ == '__main__':
logging.basicConfig(format='%(message)s', level=logging.INFO)
unittest.main()

45
gsi/gsi_util/run_test.py Executable file
View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python
#
# Copyright 2017 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Script to run */tests/*_unittest.py files."""
from multiprocessing import Process
import os
import runpy
def get_unittest_files():
matches = []
for dirpath, _, filenames in os.walk('.'):
if os.path.basename(dirpath) == 'tests':
matches.extend(os.path.join(dirpath, f)
for f in filenames if f.endswith('_unittest.py'))
return matches
def run_test(unittest_file):
runpy.run_path(unittest_file, run_name='__main__')
if __name__ == '__main__':
for path in get_unittest_files():
# Forks a process to run the unittest.
# Otherwise, it only runs one unittest.
p = Process(target=run_test, args=(path,))
p.start()
p.join()
if p.exitcode != 0:
break # stops on any failure unittest