diff --git a/python-packages/.gitignore b/python-packages/.gitignore new file mode 100644 index 000000000..a65d04669 --- /dev/null +++ b/python-packages/.gitignore @@ -0,0 +1,58 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover + +# Translations +*.mo +*.pot + +# Django stuff: +*.log + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ diff --git a/python-packages/adb/__init__.py b/python-packages/adb/__init__.py new file mode 100644 index 000000000..6b509c643 --- /dev/null +++ b/python-packages/adb/__init__.py @@ -0,0 +1,17 @@ +# +# Copyright (C) 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import +from .device import * # pylint: disable=wildcard-import diff --git a/python-packages/adb/device.py b/python-packages/adb/device.py new file mode 100644 index 000000000..516e88003 --- /dev/null +++ b/python-packages/adb/device.py @@ -0,0 +1,339 @@ +# +# Copyright (C) 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging +import os +import re +import subprocess +import tempfile + + +class FindDeviceError(RuntimeError): + pass + + +class DeviceNotFoundError(FindDeviceError): + def __init__(self, serial): + self.serial = serial + super(DeviceNotFoundError, self).__init__( + 'No device with serial {}'.format(serial)) + + +class NoUniqueDeviceError(FindDeviceError): + def __init__(self): + super(NoUniqueDeviceError, self).__init__('No unique device') + + +class ShellError(RuntimeError): + def __init__(self, cmd, stdout, stderr, exit_code): + super(ShellError, self).__init__( + '`{0}` exited with code {1}'.format(cmd, exit_code)) + self.cmd = cmd + self.stdout = stdout + self.stderr = stderr + self.exit_code = exit_code + + +def get_devices(): + with open(os.devnull, 'wb') as devnull: + subprocess.check_call(['adb', 'start-server'], stdout=devnull, + stderr=devnull) + out = subprocess.check_output(['adb', 'devices']).splitlines() + + # The first line of `adb devices` just says "List of attached devices", so + # skip that. + devices = [] + for line in out[1:]: + if not line.strip(): + continue + if 'offline' in line: + continue + + serial, _ = re.split(r'\s+', line, maxsplit=1) + devices.append(serial) + return devices + + +def _get_unique_device(product=None): + devices = get_devices() + if len(devices) != 1: + raise NoUniqueDeviceError() + return AndroidDevice(devices[0], product) + + +def _get_device_by_serial(serial, product=None): + for device in get_devices(): + if device == serial: + return AndroidDevice(serial, product) + raise DeviceNotFoundError(serial) + + +def get_device(serial=None, product=None): + """Get a uniquely identified AndroidDevice if one is available. + + Raises: + DeviceNotFoundError: + The serial specified by `serial` or $ANDROID_SERIAL is not + connected. + + NoUniqueDeviceError: + Neither `serial` nor $ANDROID_SERIAL was set, and the number of + devices connected to the system is not 1. Having 0 connected + devices will also result in this error. + + Returns: + An AndroidDevice associated with the first non-None identifier in the + following order of preference: + + 1) The `serial` argument. + 2) The environment variable $ANDROID_SERIAL. + 3) The single device connnected to the system. + """ + if serial is not None: + return _get_device_by_serial(serial, product) + + android_serial = os.getenv('ANDROID_SERIAL') + if android_serial is not None: + return _get_device_by_serial(android_serial, product) + + return _get_unique_device(product) + +# Call this instead of subprocess.check_output() to work-around issue in Python +# 2's subprocess class on Windows where it doesn't support Unicode. This +# writes the command line to a UTF-8 batch file that is properly interpreted +# by cmd.exe. +def _subprocess_check_output(*popenargs, **kwargs): + # Only do this slow work-around if Unicode is in the cmd line. + if (os.name == 'nt' and + any(isinstance(arg, unicode) for arg in popenargs[0])): + # cmd.exe requires a suffix to know that it is running a batch file + tf = tempfile.NamedTemporaryFile('wb', suffix='.cmd', delete=False) + # @ in batch suppresses echo of the current line. + # Change the codepage to 65001, the UTF-8 codepage. + tf.write('@chcp 65001 > nul\r\n') + tf.write('@') + # Properly quote all the arguments and encode in UTF-8. + tf.write(subprocess.list2cmdline(popenargs[0]).encode('utf-8')) + tf.close() + + try: + result = subprocess.check_output(['cmd.exe', '/c', tf.name], + **kwargs) + except subprocess.CalledProcessError as e: + # Show real command line instead of the cmd.exe command line. + raise subprocess.CalledProcessError(e.returncode, popenargs[0], + output=e.output) + finally: + os.remove(tf.name) + return result + else: + return subprocess.check_output(*popenargs, **kwargs) + +class AndroidDevice(object): + # Delimiter string to indicate the start of the exit code. + _RETURN_CODE_DELIMITER = 'x' + + # Follow any shell command with this string to get the exit + # status of a program since this isn't propagated by adb. + # + # The delimiter is needed because `printf 1; echo $?` would print + # "10", and we wouldn't be able to distinguish the exit code. + _RETURN_CODE_PROBE_STRING = 'echo "{0}$?"'.format(_RETURN_CODE_DELIMITER) + + # Maximum search distance from the output end to find the delimiter. + # adb on Windows returns \r\n even if adbd returns \n. + _RETURN_CODE_SEARCH_LENGTH = len('{0}255\r\n'.format(_RETURN_CODE_DELIMITER)) + + # Shell protocol feature string. + SHELL_PROTOCOL_FEATURE = 'shell_2' + + def __init__(self, serial, product=None): + self.serial = serial + self.product = product + self.adb_cmd = ['adb'] + if self.serial is not None: + self.adb_cmd.extend(['-s', serial]) + if self.product is not None: + self.adb_cmd.extend(['-p', product]) + self._linesep = None + self._features = None + + @property + def linesep(self): + if self._linesep is None: + self._linesep = subprocess.check_output(self.adb_cmd + + ['shell', 'echo']) + return self._linesep + + @property + def features(self): + if self._features is None: + try: + self._features = self._simple_call(['features']).splitlines() + except subprocess.CalledProcessError: + self._features = [] + return self._features + + def _make_shell_cmd(self, user_cmd): + command = self.adb_cmd + ['shell'] + user_cmd + if self.SHELL_PROTOCOL_FEATURE not in self.features: + command.append('; ' + self._RETURN_CODE_PROBE_STRING) + return command + + def _parse_shell_output(self, out): + """Finds the exit code string from shell output. + + Args: + out: Shell output string. + + Returns: + An (exit_code, output_string) tuple. The output string is + cleaned of any additional stuff we appended to find the + exit code. + + Raises: + RuntimeError: Could not find the exit code in |out|. + """ + search_text = out + if len(search_text) > self._RETURN_CODE_SEARCH_LENGTH: + # We don't want to search over massive amounts of data when we know + # the part we want is right at the end. + search_text = search_text[-self._RETURN_CODE_SEARCH_LENGTH:] + partition = search_text.rpartition(self._RETURN_CODE_DELIMITER) + if partition[1] == '': + raise RuntimeError('Could not find exit status in shell output.') + result = int(partition[2]) + # partition[0] won't contain the full text if search_text was truncated, + # pull from the original string instead. + out = out[:-len(partition[1]) - len(partition[2])] + return result, out + + def _simple_call(self, cmd): + logging.info(' '.join(self.adb_cmd + cmd)) + return _subprocess_check_output( + self.adb_cmd + cmd, stderr=subprocess.STDOUT) + + def shell(self, cmd): + """Calls `adb shell` + + Args: + cmd: string shell command to execute. + + Returns: + A (stdout, stderr) tuple. Stderr may be combined into stdout + if the device doesn't support separate streams. + + Raises: + ShellError: the exit code was non-zero. + """ + exit_code, stdout, stderr = self.shell_nocheck(cmd) + if exit_code != 0: + raise ShellError(cmd, stdout, stderr, exit_code) + return stdout, stderr + + def shell_nocheck(self, cmd): + """Calls `adb shell` + + Args: + cmd: string shell command to execute. + + Returns: + An (exit_code, stdout, stderr) tuple. Stderr may be combined + into stdout if the device doesn't support separate streams. + """ + cmd = self._make_shell_cmd(cmd) + logging.info(' '.join(cmd)) + p = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + if self.SHELL_PROTOCOL_FEATURE in self.features: + exit_code = p.returncode + else: + exit_code, stdout = self._parse_shell_output(stdout) + return exit_code, stdout, stderr + + def install(self, filename, replace=False): + cmd = ['install'] + if replace: + cmd.append('-r') + cmd.append(filename) + return self._simple_call(cmd) + + def push(self, local, remote): + return self._simple_call(['push', local, remote]) + + def pull(self, remote, local): + return self._simple_call(['pull', remote, local]) + + def sync(self, directory=None): + cmd = ['sync'] + if directory is not None: + cmd.append(directory) + return self._simple_call(cmd) + + def forward(self, local, remote): + return self._simple_call(['forward', local, remote]) + + def tcpip(self, port): + return self._simple_call(['tcpip', port]) + + def usb(self): + return self._simple_call(['usb']) + + def reboot(self): + return self._simple_call(['reboot']) + + def root(self): + return self._simple_call(['root']) + + def unroot(self): + return self._simple_call(['unroot']) + + def forward_remove(self, local): + return self._simple_call(['forward', '--remove', local]) + + def forward_remove_all(self): + return self._simple_call(['forward', '--remove-all']) + + def connect(self, host): + return self._simple_call(['connect', host]) + + def disconnect(self, host): + return self._simple_call(['disconnect', host]) + + def reverse(self, remote, local): + return self._simple_call(['reverse', remote, local]) + + def reverse_remove_all(self): + return self._simple_call(['reverse', '--remove-all']) + + def reverse_remove(self, remote): + return self._simple_call(['reverse', '--remove', remote]) + + def wait(self): + return self._simple_call(['wait-for-device']) + + def get_prop(self, prop_name): + output = self.shell(['getprop', prop_name])[0].splitlines() + if len(output) != 1: + raise RuntimeError('Too many lines in getprop output:\n' + + '\n'.join(output)) + value = output[0] + if not value.strip(): + return None + return value + + def set_prop(self, prop_name, value): + self.shell(['setprop', prop_name, value]) diff --git a/python-packages/adb/setup.py b/python-packages/adb/setup.py new file mode 100644 index 000000000..5595cdd1d --- /dev/null +++ b/python-packages/adb/setup.py @@ -0,0 +1,32 @@ +# +# Copyright (C) 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from distutils.core import setup + + +setup( + name='adb', + version='0.0.1', + description='A Python interface to the Android Debug Bridge.', + license='Apache 2.0', + keywords='adb android', + package_dir={'adb': ''}, + packages=['adb'], + classifiers=[ + 'Development Status :: 2 - Pre-Alpha', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: Apache Software License', + ] +) diff --git a/python-packages/adb/test_device.py b/python-packages/adb/test_device.py new file mode 100644 index 000000000..d033a019b --- /dev/null +++ b/python-packages/adb/test_device.py @@ -0,0 +1,537 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (C) 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import print_function + +import hashlib +import os +import posixpath +import random +import shlex +import shutil +import signal +import subprocess +import tempfile +import unittest + +import mock + +import adb + + +def requires_root(func): + def wrapper(self, *args): + if self.device.get_prop('ro.debuggable') != '1': + raise unittest.SkipTest('requires rootable build') + + was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' + if not was_root: + self.device.root() + self.device.wait() + + try: + func(self, *args) + finally: + if not was_root: + self.device.unroot() + self.device.wait() + + return wrapper + + +class GetDeviceTest(unittest.TestCase): + def setUp(self): + self.android_serial = os.getenv('ANDROID_SERIAL') + if 'ANDROID_SERIAL' in os.environ: + del os.environ['ANDROID_SERIAL'] + + def tearDown(self): + if self.android_serial is not None: + os.environ['ANDROID_SERIAL'] = self.android_serial + else: + if 'ANDROID_SERIAL' in os.environ: + del os.environ['ANDROID_SERIAL'] + + @mock.patch('adb.device.get_devices') + def test_explicit(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + device = adb.get_device('foo') + self.assertEqual(device.serial, 'foo') + + @mock.patch('adb.device.get_devices') + def test_from_env(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + os.environ['ANDROID_SERIAL'] = 'foo' + device = adb.get_device() + self.assertEqual(device.serial, 'foo') + + @mock.patch('adb.device.get_devices') + def test_arg_beats_env(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + os.environ['ANDROID_SERIAL'] = 'bar' + device = adb.get_device('foo') + self.assertEqual(device.serial, 'foo') + + @mock.patch('adb.device.get_devices') + def test_no_such_device(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz']) + + os.environ['ANDROID_SERIAL'] = 'baz' + self.assertRaises(adb.DeviceNotFoundError, adb.get_device) + + @mock.patch('adb.device.get_devices') + def test_unique_device(self, mock_get_devices): + mock_get_devices.return_value = ['foo'] + device = adb.get_device() + self.assertEqual(device.serial, 'foo') + + @mock.patch('adb.device.get_devices') + def test_no_unique_device(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + self.assertRaises(adb.NoUniqueDeviceError, adb.get_device) + + +class DeviceTest(unittest.TestCase): + def setUp(self): + self.device = adb.get_device() + + +class ShellTest(DeviceTest): + def test_cat(self): + """Check that we can at least cat a file.""" + out = self.device.shell(['cat', '/proc/uptime'])[0].strip() + elements = out.split() + self.assertEqual(len(elements), 2) + + uptime, idle = elements + self.assertGreater(float(uptime), 0.0) + self.assertGreater(float(idle), 0.0) + + def test_throws_on_failure(self): + self.assertRaises(adb.ShellError, self.device.shell, ['false']) + + def test_output_not_stripped(self): + out = self.device.shell(['echo', 'foo'])[0] + self.assertEqual(out, 'foo' + self.device.linesep) + + def test_shell_nocheck_failure(self): + rc, out, _ = self.device.shell_nocheck(['false']) + self.assertNotEqual(rc, 0) + self.assertEqual(out, '') + + def test_shell_nocheck_output_not_stripped(self): + rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) + self.assertEqual(rc, 0) + self.assertEqual(out, 'foo' + self.device.linesep) + + def test_can_distinguish_tricky_results(self): + # If result checking on ADB shell is naively implemented as + # `adb shell ; echo $?`, we would be unable to distinguish the + # output from the result for a cmd of `echo -n 1`. + rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) + self.assertEqual(rc, 0) + self.assertEqual(out, '1') + + def test_line_endings(self): + """Ensure that line ending translation is not happening in the pty. + + Bug: http://b/19735063 + """ + output = self.device.shell(['uname'])[0] + self.assertEqual(output, 'Linux' + self.device.linesep) + + def test_pty_logic(self): + """Verify PTY logic for shells. + + Interactive shells should use a PTY, non-interactive should not. + + Bug: http://b/21215503 + """ + proc = subprocess.Popen( + self.device.adb_cmd + ['shell'], stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + # [ -t 0 ] is used (rather than `tty`) to provide portability. This + # gives an exit code of 0 iff stdin is connected to a terminal. + # + # Closing host-side stdin doesn't currently trigger the interactive + # shell to exit so we need to explicitly add an exit command to + # close the session from the device side, and append \n to complete + # the interactive command. + result = proc.communicate('[ -t 0 ]; echo x$?; exit 0\n')[0] + partition = result.rpartition('x') + self.assertEqual(partition[1], 'x') + self.assertEqual(int(partition[2]), 0) + + exit_code = self.device.shell_nocheck(['[ -t 0 ]'])[0] + self.assertEqual(exit_code, 1) + + def test_shell_protocol(self): + """Tests the shell protocol on the device. + + If the device supports shell protocol, this gives us the ability + to separate stdout/stderr and return the exit code directly. + + Bug: http://b/19734861 + """ + if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features: + raise unittest.SkipTest('shell protocol unsupported on this device') + result = self.device.shell_nocheck( + shlex.split('echo foo; echo bar >&2; exit 17')) + + self.assertEqual(17, result[0]) + self.assertEqual('foo' + self.device.linesep, result[1]) + self.assertEqual('bar' + self.device.linesep, result[2]) + + def test_non_interactive_sigint(self): + """Tests that SIGINT in a non-interactive shell kills the process. + + This requires the shell protocol in order to detect the broken + pipe; raw data transfer mode will only see the break once the + subprocess tries to read or write. + + Bug: http://b/23825725 + """ + if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features: + raise unittest.SkipTest('shell protocol unsupported on this device') + + # Start a long-running process. + sleep_proc = subprocess.Popen( + self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + remote_pid = sleep_proc.stdout.readline().strip() + self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') + proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) + + # Verify that the process is running, send signal, verify it stopped. + self.device.shell(proc_query) + os.kill(sleep_proc.pid, signal.SIGINT) + sleep_proc.communicate() + self.assertEqual(1, self.device.shell_nocheck(proc_query)[0], + 'subprocess failed to terminate') + + +class ArgumentEscapingTest(DeviceTest): + def test_shell_escaping(self): + """Make sure that argument escaping is somewhat sane.""" + + # http://b/19734868 + # Note that this actually matches ssh(1)'s behavior --- it's + # converted to `sh -c echo hello; echo world` which sh interprets + # as `sh -c echo` (with an argument to that shell of "hello"), + # and then `echo world` back in the first shell. + result = self.device.shell( + shlex.split("sh -c 'echo hello; echo world'"))[0] + result = result.splitlines() + self.assertEqual(['', 'world'], result) + # If you really wanted "hello" and "world", here's what you'd do: + result = self.device.shell( + shlex.split(r'echo hello\;echo world'))[0].splitlines() + self.assertEqual(['hello', 'world'], result) + + # http://b/15479704 + result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() + self.assertEqual('t', result) + result = self.device.shell( + shlex.split("sh -c 'true && echo t'"))[0].strip() + self.assertEqual('t', result) + + # http://b/20564385 + result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() + self.assertEqual('t', result) + result = self.device.shell( + shlex.split(r'echo -n 123\;uname'))[0].strip() + self.assertEqual('123Linux', result) + + def test_install_argument_escaping(self): + """Make sure that install argument escaping works.""" + # http://b/20323053 + tf = tempfile.NamedTemporaryFile('wb', suffix='-text;ls;1.apk', + delete=False) + tf.close() + self.assertIn("-text;ls;1.apk", self.device.install(tf.name)) + os.remove(tf.name) + + # http://b/3090932 + tf = tempfile.NamedTemporaryFile('wb', suffix="-Live Hold'em.apk", + delete=False) + tf.close() + self.assertIn("-Live Hold'em.apk", self.device.install(tf.name)) + os.remove(tf.name) + + +class RootUnrootTest(DeviceTest): + def _test_root(self): + message = self.device.root() + if 'adbd cannot run as root in production builds' in message: + return + self.device.wait() + self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) + + def _test_unroot(self): + self.device.unroot() + self.device.wait() + self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) + + def test_root_unroot(self): + """Make sure that adb root and adb unroot work, using id(1).""" + if self.device.get_prop('ro.debuggable') != '1': + raise unittest.SkipTest('requires rootable build') + + original_user = self.device.shell(['id', '-un'])[0].strip() + try: + if original_user == 'root': + self._test_unroot() + self._test_root() + elif original_user == 'shell': + self._test_root() + self._test_unroot() + finally: + if original_user == 'root': + self.device.root() + else: + self.device.unroot() + self.device.wait() + + +class TcpIpTest(DeviceTest): + def test_tcpip_failure_raises(self): + """adb tcpip requires a port. + + Bug: http://b/22636927 + """ + self.assertRaises( + subprocess.CalledProcessError, self.device.tcpip, '') + self.assertRaises( + subprocess.CalledProcessError, self.device.tcpip, 'foo') + + +class SystemPropertiesTest(DeviceTest): + def test_get_prop(self): + self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') + + @requires_root + def test_set_prop(self): + prop_name = 'foo.bar' + self.device.shell(['setprop', prop_name, '""']) + + self.device.set_prop(prop_name, 'qux') + self.assertEqual( + self.device.shell(['getprop', prop_name])[0].strip(), 'qux') + + +def compute_md5(string): + hsh = hashlib.md5() + hsh.update(string) + return hsh.hexdigest() + + +def get_md5_prog(device): + """Older platforms (pre-L) had the name md5 rather than md5sum.""" + try: + device.shell(['md5sum', '/proc/uptime']) + return 'md5sum' + except subprocess.CalledProcessError: + return 'md5' + + +class HostFile(object): + def __init__(self, handle, checksum): + self.handle = handle + self.checksum = checksum + self.full_path = handle.name + self.base_name = os.path.basename(self.full_path) + + +class DeviceFile(object): + def __init__(self, checksum, full_path): + self.checksum = checksum + self.full_path = full_path + self.base_name = posixpath.basename(self.full_path) + + +def make_random_host_files(in_dir, num_files): + min_size = 1 * (1 << 10) + max_size = 16 * (1 << 10) + + files = [] + for _ in xrange(num_files): + file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) + + size = random.randrange(min_size, max_size, 1024) + rand_str = os.urandom(size) + file_handle.write(rand_str) + file_handle.flush() + file_handle.close() + + md5 = compute_md5(rand_str) + files.append(HostFile(file_handle, md5)) + return files + + +def make_random_device_files(device, in_dir, num_files): + min_size = 1 * (1 << 10) + max_size = 16 * (1 << 10) + + files = [] + for file_num in xrange(num_files): + size = random.randrange(min_size, max_size, 1024) + + base_name = 'device_tmpfile' + str(file_num) + full_path = posixpath.join(in_dir, base_name) + + device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), + 'bs={}'.format(size), 'count=1']) + dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split() + + files.append(DeviceFile(dev_md5, full_path)) + return files + + +class FileOperationsTest(DeviceTest): + SCRATCH_DIR = '/data/local/tmp' + DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' + DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' + + def _test_push(self, local_file, checksum): + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) + self.device.push(local=local_file, remote=self.DEVICE_TEMP_FILE) + dev_md5, _ = self.device.shell([get_md5_prog(self.device), + self.DEVICE_TEMP_FILE])[0].split() + self.assertEqual(checksum, dev_md5) + self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) + + def test_push(self): + """Push a randomly generated file to specified device.""" + kbytes = 512 + tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) + rand_str = os.urandom(1024 * kbytes) + tmp.write(rand_str) + tmp.close() + self._test_push(tmp.name, compute_md5(rand_str)) + os.remove(tmp.name) + + # TODO: write push directory test. + + def _test_pull(self, remote_file, checksum): + tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) + tmp_write.close() + self.device.pull(remote=remote_file, local=tmp_write.name) + with open(tmp_write.name, 'rb') as tmp_read: + host_contents = tmp_read.read() + host_md5 = compute_md5(host_contents) + self.assertEqual(checksum, host_md5) + os.remove(tmp_write.name) + + def test_pull(self): + """Pull a randomly generated file from specified device.""" + kbytes = 512 + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) + cmd = ['dd', 'if=/dev/urandom', + 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', + 'count={}'.format(kbytes)] + self.device.shell(cmd) + dev_md5, _ = self.device.shell( + [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split() + self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) + self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) + + def test_pull_dir(self): + """Pull a randomly generated directory of files from the device.""" + host_dir = tempfile.mkdtemp() + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) + self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) + + # Populate device directory with random files. + temp_files = make_random_device_files( + self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) + + self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) + + for temp_file in temp_files: + host_path = os.path.join(host_dir, temp_file.base_name) + with open(host_path, 'rb') as host_file: + host_md5 = compute_md5(host_file.read()) + self.assertEqual(host_md5, temp_file.checksum) + + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) + if host_dir is not None: + shutil.rmtree(host_dir) + + def test_sync(self): + """Sync a randomly generated directory of files to specified device.""" + base_dir = tempfile.mkdtemp() + + # Create mirror device directory hierarchy within base_dir. + full_dir_path = base_dir + self.DEVICE_TEMP_DIR + os.makedirs(full_dir_path) + + # Create 32 random files within the host mirror. + temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32) + + # Clean up any trash on the device. + device = adb.get_device(product=base_dir) + device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) + + device.sync('data') + + # Confirm that every file on the device mirrors that on the host. + for temp_file in temp_files: + device_full_path = posixpath.join(self.DEVICE_TEMP_DIR, + temp_file.base_name) + dev_md5, _ = device.shell( + [get_md5_prog(self.device), device_full_path])[0].split() + self.assertEqual(temp_file.checksum, dev_md5) + + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) + if base_dir is not None: + shutil.rmtree(base_dir) + + def test_unicode_paths(self): + """Ensure that we can support non-ASCII paths, even on Windows.""" + name = u'로보카 폴리' + + ## push. + tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) + tf.close() + self.device.push(tf.name, u'/data/local/tmp/adb-test-{}'.format(name)) + os.remove(tf.name) + self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) + + # pull. + cmd = ['touch', u'"/data/local/tmp/adb-test-{}"'.format(name)] + self.device.shell(cmd) + + tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) + tf.close() + self.device.pull(u'/data/local/tmp/adb-test-{}'.format(name), tf.name) + os.remove(tf.name) + self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) + + +def main(): + random.seed(0) + if len(adb.get_devices()) > 0: + suite = unittest.TestLoader().loadTestsFromName(__name__) + unittest.TextTestRunner(verbosity=3).run(suite) + else: + print('Test suite must be run with attached devices') + + +if __name__ == '__main__': + main()