# # Copyright (C) 2015 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import os import re import subprocess import tempfile class FindDeviceError(RuntimeError): pass class DeviceNotFoundError(FindDeviceError): def __init__(self, serial): self.serial = serial super(DeviceNotFoundError, self).__init__( 'No device with serial {}'.format(serial)) class NoUniqueDeviceError(FindDeviceError): def __init__(self): super(NoUniqueDeviceError, self).__init__('No unique device') class ShellError(RuntimeError): def __init__(self, cmd, stdout, stderr, exit_code): super(ShellError, self).__init__( '`{0}` exited with code {1}'.format(cmd, exit_code)) self.cmd = cmd self.stdout = stdout self.stderr = stderr self.exit_code = exit_code def get_devices(): with open(os.devnull, 'wb') as devnull: subprocess.check_call(['adb', 'start-server'], stdout=devnull, stderr=devnull) out = subprocess.check_output(['adb', 'devices']).splitlines() # The first line of `adb devices` just says "List of attached devices", so # skip that. devices = [] for line in out[1:]: if not line.strip(): continue if 'offline' in line: continue serial, _ = re.split(r'\s+', line, maxsplit=1) devices.append(serial) return devices def _get_unique_device(product=None): devices = get_devices() if len(devices) != 1: raise NoUniqueDeviceError() return AndroidDevice(devices[0], product) def _get_device_by_serial(serial, product=None): for device in get_devices(): if device == serial: return AndroidDevice(serial, product) raise DeviceNotFoundError(serial) def get_device(serial=None, product=None): """Get a uniquely identified AndroidDevice if one is available. Raises: DeviceNotFoundError: The serial specified by `serial` or $ANDROID_SERIAL is not connected. NoUniqueDeviceError: Neither `serial` nor $ANDROID_SERIAL was set, and the number of devices connected to the system is not 1. Having 0 connected devices will also result in this error. Returns: An AndroidDevice associated with the first non-None identifier in the following order of preference: 1) The `serial` argument. 2) The environment variable $ANDROID_SERIAL. 3) The single device connnected to the system. """ if serial is not None: return _get_device_by_serial(serial, product) android_serial = os.getenv('ANDROID_SERIAL') if android_serial is not None: return _get_device_by_serial(android_serial, product) return _get_unique_device(product) # Call this instead of subprocess.check_output() to work-around issue in Python # 2's subprocess class on Windows where it doesn't support Unicode. This # writes the command line to a UTF-8 batch file that is properly interpreted # by cmd.exe. def _subprocess_check_output(*popenargs, **kwargs): # Only do this slow work-around if Unicode is in the cmd line. if (os.name == 'nt' and any(isinstance(arg, unicode) for arg in popenargs[0])): # cmd.exe requires a suffix to know that it is running a batch file tf = tempfile.NamedTemporaryFile('wb', suffix='.cmd', delete=False) # @ in batch suppresses echo of the current line. # Change the codepage to 65001, the UTF-8 codepage. tf.write('@chcp 65001 > nul\r\n') tf.write('@') # Properly quote all the arguments and encode in UTF-8. tf.write(subprocess.list2cmdline(popenargs[0]).encode('utf-8')) tf.close() try: result = subprocess.check_output(['cmd.exe', '/c', tf.name], **kwargs) except subprocess.CalledProcessError as e: # Show real command line instead of the cmd.exe command line. raise subprocess.CalledProcessError(e.returncode, popenargs[0], output=e.output) finally: os.remove(tf.name) return result else: return subprocess.check_output(*popenargs, **kwargs) class AndroidDevice(object): # Delimiter string to indicate the start of the exit code. _RETURN_CODE_DELIMITER = 'x' # Follow any shell command with this string to get the exit # status of a program since this isn't propagated by adb. # # The delimiter is needed because `printf 1; echo $?` would print # "10", and we wouldn't be able to distinguish the exit code. _RETURN_CODE_PROBE_STRING = 'echo "{0}$?"'.format(_RETURN_CODE_DELIMITER) # Maximum search distance from the output end to find the delimiter. # adb on Windows returns \r\n even if adbd returns \n. _RETURN_CODE_SEARCH_LENGTH = len('{0}255\r\n'.format(_RETURN_CODE_DELIMITER)) # Shell protocol feature string. SHELL_PROTOCOL_FEATURE = 'shell_2' def __init__(self, serial, product=None): self.serial = serial self.product = product self.adb_cmd = ['adb'] if self.serial is not None: self.adb_cmd.extend(['-s', serial]) if self.product is not None: self.adb_cmd.extend(['-p', product]) self._linesep = None self._features = None @property def linesep(self): if self._linesep is None: self._linesep = subprocess.check_output(self.adb_cmd + ['shell', 'echo']) return self._linesep @property def features(self): if self._features is None: try: self._features = self._simple_call(['features']).splitlines() except subprocess.CalledProcessError: self._features = [] return self._features def _make_shell_cmd(self, user_cmd): command = self.adb_cmd + ['shell'] + user_cmd if self.SHELL_PROTOCOL_FEATURE not in self.features: command.append('; ' + self._RETURN_CODE_PROBE_STRING) return command def _parse_shell_output(self, out): """Finds the exit code string from shell output. Args: out: Shell output string. Returns: An (exit_code, output_string) tuple. The output string is cleaned of any additional stuff we appended to find the exit code. Raises: RuntimeError: Could not find the exit code in |out|. """ search_text = out if len(search_text) > self._RETURN_CODE_SEARCH_LENGTH: # We don't want to search over massive amounts of data when we know # the part we want is right at the end. search_text = search_text[-self._RETURN_CODE_SEARCH_LENGTH:] partition = search_text.rpartition(self._RETURN_CODE_DELIMITER) if partition[1] == '': raise RuntimeError('Could not find exit status in shell output.') result = int(partition[2]) # partition[0] won't contain the full text if search_text was truncated, # pull from the original string instead. out = out[:-len(partition[1]) - len(partition[2])] return result, out def _simple_call(self, cmd): logging.info(' '.join(self.adb_cmd + cmd)) return _subprocess_check_output( self.adb_cmd + cmd, stderr=subprocess.STDOUT) def shell(self, cmd): """Calls `adb shell` Args: cmd: string shell command to execute. Returns: A (stdout, stderr) tuple. Stderr may be combined into stdout if the device doesn't support separate streams. Raises: ShellError: the exit code was non-zero. """ exit_code, stdout, stderr = self.shell_nocheck(cmd) if exit_code != 0: raise ShellError(cmd, stdout, stderr, exit_code) return stdout, stderr def shell_nocheck(self, cmd): """Calls `adb shell` Args: cmd: string shell command to execute. Returns: An (exit_code, stdout, stderr) tuple. Stderr may be combined into stdout if the device doesn't support separate streams. """ cmd = self._make_shell_cmd(cmd) logging.info(' '.join(cmd)) p = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() if self.SHELL_PROTOCOL_FEATURE in self.features: exit_code = p.returncode else: exit_code, stdout = self._parse_shell_output(stdout) return exit_code, stdout, stderr def install(self, filename, replace=False): cmd = ['install'] if replace: cmd.append('-r') cmd.append(filename) return self._simple_call(cmd) def push(self, local, remote): return self._simple_call(['push', local, remote]) def pull(self, remote, local): return self._simple_call(['pull', remote, local]) def sync(self, directory=None): cmd = ['sync'] if directory is not None: cmd.append(directory) return self._simple_call(cmd) def forward(self, local, remote): return self._simple_call(['forward', local, remote]) def tcpip(self, port): return self._simple_call(['tcpip', port]) def usb(self): return self._simple_call(['usb']) def reboot(self): return self._simple_call(['reboot']) def root(self): return self._simple_call(['root']) def unroot(self): return self._simple_call(['unroot']) def forward_remove(self, local): return self._simple_call(['forward', '--remove', local]) def forward_remove_all(self): return self._simple_call(['forward', '--remove-all']) def connect(self, host): return self._simple_call(['connect', host]) def disconnect(self, host): return self._simple_call(['disconnect', host]) def reverse(self, remote, local): return self._simple_call(['reverse', remote, local]) def reverse_remove_all(self): return self._simple_call(['reverse', '--remove-all']) def reverse_remove(self, remote): return self._simple_call(['reverse', '--remove', remote]) def wait(self): return self._simple_call(['wait-for-device']) def get_prop(self, prop_name): output = self.shell(['getprop', prop_name])[0].splitlines() if len(output) != 1: raise RuntimeError('Too many lines in getprop output:\n' + '\n'.join(output)) value = output[0] if not value.strip(): return None return value def set_prop(self, prop_name, value): self.shell(['setprop', prop_name, value])