import random import re import socket import time import typing as t import uiautomator2 as u2 from adbutils import AdbTimeout from lxml import etree try: # adbutils 0.x from adbutils import _AdbStreamConnection as AdbConnection except ImportError: # adbutils >= 1.0 from adbutils import AdbConnection # Patch list2cmdline back to subprocess.list2cmdline # We expect `screencap | nc 192.168.0.1 20298` instead of `screencap '|' nc 192.168.80.1 20298` import adbutils import subprocess adbutils._utils.list2cmdline = subprocess.list2cmdline adbutils._device.list2cmdline = subprocess.list2cmdline # BaseDevice.shell() is missing a check_okay() call before reading output, # resulting in an `OKAY` prefix in output. def shell(self, cmdargs: t.Union[str, list, tuple], stream: bool = False, timeout: t.Optional[float] = None, rstrip=True) -> t.Union[AdbConnection, str]: if isinstance(cmdargs, (list, tuple)): cmdargs = subprocess.list2cmdline(cmdargs) if stream: timeout = None c = self.open_transport(timeout=timeout) c.send_command("shell:" + cmdargs) c.check_okay() # check_okay() is missing here if stream: return c output = c.read_until_close() return output.rstrip() if rstrip else output adbutils._device.BaseDevice.shell = shell from module.base.decorator import cached_property from module.logger import logger RETRY_TRIES = 5 RETRY_DELAY = 3 def is_port_using(port_num): """ if port is using by others, return True. else return False """ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(2) try: s.bind(('127.0.0.1', port_num)) return False except OSError: # Address already bind return True finally: s.close() def random_port(port_range): """ get a random port from port set """ new_port = random.choice(list(range(*port_range))) if is_port_using(new_port): return random_port(port_range) else: return new_port def recv_all(stream, chunk_size=4096, recv_interval=0.000) -> bytes: """ Args: stream: chunk_size: recv_interval (float): Default to 0.000, use 0.001 if receiving as server Returns: bytes: Raises: AdbTimeout """ if isinstance(stream, AdbConnection): stream = stream.conn stream.settimeout(10) else: stream.settimeout(10) try: fragments = [] while 1: chunk = stream.recv(chunk_size) if chunk: fragments.append(chunk) # See https://stackoverflow.com/questions/23837827/python-server-program-has-high-cpu-usage/41749820#41749820 time.sleep(recv_interval) else: break return remove_shell_warning(b''.join(fragments)) except socket.timeout: raise AdbTimeout('adb read timeout') def possible_reasons(*args): """ Show possible reasons Possible reason #1: Possible reason #2: """ for index, reason in enumerate(args): index += 1 logger.critical(f'Possible reason #{index}: {reason}') class PackageNotInstalled(Exception): pass class ImageTruncated(Exception): pass def retry_sleep(trial): # First trial if trial == 0: pass # Failed once, fast retry elif trial == 1: pass # Failed twice elif trial == 2: time.sleep(1) # Failed more else: time.sleep(RETRY_DELAY) def handle_adb_error(e): """ Args: e (Exception): Returns: bool: If should retry """ text = str(e) if 'not found' in text: # When you call `adb disconnect ` # Or when adb server was killed (low possibility) # AdbError(device '127.0.0.1:59865' not found) logger.error(e) return True elif 'timeout' in text: # AdbTimeout(adb read timeout) logger.error(e) return True elif 'closed' in text: # AdbError(closed) # Usually after AdbTimeout(adb read timeout) # Disconnect and re-connect should fix this. logger.error(e) return True elif 'device offline' in text: # AdbError(device offline) # When a device that has been connected wirelessly is disconnected passively, # it does not disappear from the adb device list, # but will be displayed as offline. # In many cases, such as disconnection and recovery caused by network fluctuations, # or after VMOS reboot when running Alas on a phone, # the device is still available, but it needs to be disconnected and re-connected. logger.error(e) return True elif 'is offline' in text: # RuntimeError: USB device 127.0.0.1:7555 is offline # Raised by uiautomator2 when current adb service is killed by another version of adb service. logger.error(e) return True elif 'unknown host service' in text: # AdbError(unknown host service) # Another version of ADB service started, current ADB service has been killed. # Usually because user opened a Chinese emulator, which uses ADB from the Stone Age. logger.error(e) return True else: # AdbError() logger.exception(e) possible_reasons( 'If you are using BlueStacks or LD player or WSA, please enable ADB in the settings of your emulator', 'Emulator died, please restart emulator', 'Serial incorrect, no such device exists or emulator is not running' ) return False def get_serial_pair(serial): """ Args: serial (str): Returns: str, str: `127.0.0.1:5555+{X}` and `emulator-5554+{X}`, 0 <= X <= 32 """ if serial.startswith('127.0.0.1:'): try: port = int(serial[10:]) if 5555 <= port <= 5555 + 32: return f'127.0.0.1:{port}', f'emulator-{port - 1}' except (ValueError, IndexError): pass if serial.startswith('emulator-'): try: port = int(serial[9:]) if 5554 <= port <= 5554 + 32: return f'127.0.0.1:{port + 1}', f'emulator-{port}' except (ValueError, IndexError): pass return None, None def remove_prefix(s, prefix): """ Remove prefix of a string or bytes like `string.removeprefix(prefix)`, which is on Python3.9+ Args: s (str, bytes): prefix (str, bytes): Returns: str, bytes: """ return s[len(prefix):] if s.startswith(prefix) else s def remove_suffix(s, suffix): """ Remove suffix of a string or bytes like `string.removesuffix(suffix)`, which is on Python3.9+ Args: s (str, bytes): suffix (str, bytes): Returns: str, bytes: """ return s[:len(suffix)] if s.endswith(suffix) else s def remove_shell_warning(s): """ Remove warnings from shell Args: s (str, bytes): Returns: str, bytes: """ # WARNING: linker: [vdso]: unused DT entry: type 0x70000001 arg 0x0\n\x89PNG\r\n\x1a\n\x00\x00\x00\rIH if isinstance(s, bytes): if s.startswith(b'WARNING'): try: s = s.split(b'\n', maxsplit=1)[1] except IndexError: pass return s # return re.sub(b'^WARNING.+\n', b'', s) elif isinstance(s, str): if s.startswith('WARNING'): try: s = s.split('\n', maxsplit=1)[1] except IndexError: pass return s class IniterNoMinicap(u2.init.Initer): @property def minicap_urls(self): """ Don't install minicap on emulators, return empty urls. binary from https://github.com/openatx/stf-binaries only got abi: armeabi-v7a and arm64-v8a """ return [] class Device(u2.Device): def show_float_window(self, show=True): """ Don't show float windows. """ pass # Monkey patch u2.init.Initer = IniterNoMinicap u2.Device = Device class HierarchyButton: """ Convert UI hierarchy to an object like the Button in Alas. """ _name_regex = re.compile('@.*?=[\'\"](.*?)[\'\"]') def __init__(self, hierarchy: etree._Element, xpath: str): self.hierarchy = hierarchy self.xpath = xpath self.nodes = hierarchy.xpath(xpath) @cached_property def name(self): res = HierarchyButton._name_regex.findall(self.xpath) if res: return res[0] else: return 'HierarchyButton' @cached_property def count(self): return len(self.nodes) @cached_property def exist(self): return self.count == 1 @cached_property def area(self): if self.exist: bounds = self.nodes[0].attrib.get("bounds") lx, ly, rx, ry = map(int, re.findall(r"\d+", bounds)) return lx, ly, rx, ry else: return None @cached_property def button(self): return self.area def __bool__(self): return self.exist def __str__(self): return self.name @cached_property def focused(self): if self.exist: return self.nodes[0].attrib.get("focused").lower() == 'true' else: return False