2023-05-14 07:48:34 +00:00
|
|
|
import random
|
|
|
|
import re
|
|
|
|
import socket
|
|
|
|
import time
|
2023-05-22 16:07:38 +00:00
|
|
|
import typing as t
|
2023-05-14 07:48:34 +00:00
|
|
|
|
|
|
|
import uiautomator2 as u2
|
2023-05-16 15:39:18 +00:00
|
|
|
from adbutils import AdbTimeout
|
2023-05-14 07:48:34 +00:00
|
|
|
from lxml import etree
|
|
|
|
|
2023-05-16 15:39:18 +00:00
|
|
|
try:
|
|
|
|
# adbutils 0.x
|
|
|
|
from adbutils import _AdbStreamConnection as AdbConnection
|
|
|
|
except ImportError:
|
|
|
|
# adbutils >= 1.0
|
|
|
|
from adbutils import AdbConnection
|
2023-05-22 16:07:38 +00:00
|
|
|
# Patch list2cmdline back to subprocess.list2cmdline
|
|
|
|
# We expect `screencap | nc 192.168.0.1 20298` instead of `screencap '|' nc 192.168.80.1 20298`
|
|
|
|
import adbutils
|
|
|
|
import subprocess
|
|
|
|
adbutils._utils.list2cmdline = subprocess.list2cmdline
|
|
|
|
adbutils._device.list2cmdline = subprocess.list2cmdline
|
|
|
|
|
|
|
|
# BaseDevice.shell() is missing a check_okay() call before reading output,
|
|
|
|
# resulting in an `OKAY` prefix in output.
|
|
|
|
def shell(self,
|
|
|
|
cmdargs: t.Union[str, list, tuple],
|
|
|
|
stream: bool = False,
|
|
|
|
timeout: t.Optional[float] = None,
|
|
|
|
rstrip=True) -> t.Union[AdbConnection, str]:
|
|
|
|
if isinstance(cmdargs, (list, tuple)):
|
|
|
|
cmdargs = subprocess.list2cmdline(cmdargs)
|
|
|
|
if stream:
|
|
|
|
timeout = None
|
|
|
|
c = self.open_transport(timeout=timeout)
|
|
|
|
c.send_command("shell:" + cmdargs)
|
|
|
|
c.check_okay() # check_okay() is missing here
|
|
|
|
if stream:
|
|
|
|
return c
|
|
|
|
output = c.read_until_close()
|
|
|
|
return output.rstrip() if rstrip else output
|
|
|
|
|
|
|
|
adbutils._device.BaseDevice.shell = shell
|
2023-05-16 15:39:18 +00:00
|
|
|
|
2023-05-14 07:48:34 +00:00
|
|
|
from module.base.decorator import cached_property
|
|
|
|
from module.logger import logger
|
|
|
|
|
|
|
|
RETRY_TRIES = 5
|
|
|
|
RETRY_DELAY = 3
|
|
|
|
|
|
|
|
|
|
|
|
def is_port_using(port_num):
|
|
|
|
""" if port is using by others, return True. else return False """
|
|
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
s.settimeout(2)
|
|
|
|
|
|
|
|
try:
|
|
|
|
s.bind(('127.0.0.1', port_num))
|
|
|
|
return False
|
|
|
|
except OSError:
|
|
|
|
# Address already bind
|
|
|
|
return True
|
|
|
|
finally:
|
|
|
|
s.close()
|
|
|
|
|
|
|
|
|
|
|
|
def random_port(port_range):
|
|
|
|
""" get a random port from port set """
|
|
|
|
new_port = random.choice(list(range(*port_range)))
|
|
|
|
if is_port_using(new_port):
|
|
|
|
return random_port(port_range)
|
|
|
|
else:
|
|
|
|
return new_port
|
|
|
|
|
|
|
|
|
|
|
|
def recv_all(stream, chunk_size=4096, recv_interval=0.000) -> bytes:
|
|
|
|
"""
|
|
|
|
Args:
|
|
|
|
stream:
|
|
|
|
chunk_size:
|
|
|
|
recv_interval (float): Default to 0.000, use 0.001 if receiving as server
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
bytes:
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
AdbTimeout
|
|
|
|
"""
|
2023-05-16 15:39:18 +00:00
|
|
|
if isinstance(stream, AdbConnection):
|
2023-05-14 07:48:34 +00:00
|
|
|
stream = stream.conn
|
|
|
|
stream.settimeout(10)
|
|
|
|
else:
|
|
|
|
stream.settimeout(10)
|
|
|
|
|
|
|
|
try:
|
|
|
|
fragments = []
|
|
|
|
while 1:
|
|
|
|
chunk = stream.recv(chunk_size)
|
|
|
|
if chunk:
|
|
|
|
fragments.append(chunk)
|
|
|
|
# See https://stackoverflow.com/questions/23837827/python-server-program-has-high-cpu-usage/41749820#41749820
|
|
|
|
time.sleep(recv_interval)
|
|
|
|
else:
|
|
|
|
break
|
|
|
|
return remove_shell_warning(b''.join(fragments))
|
|
|
|
except socket.timeout:
|
|
|
|
raise AdbTimeout('adb read timeout')
|
|
|
|
|
|
|
|
|
|
|
|
def possible_reasons(*args):
|
|
|
|
"""
|
|
|
|
Show possible reasons
|
|
|
|
|
|
|
|
Possible reason #1: <reason_1>
|
|
|
|
Possible reason #2: <reason_2>
|
|
|
|
"""
|
|
|
|
for index, reason in enumerate(args):
|
|
|
|
index += 1
|
|
|
|
logger.critical(f'Possible reason #{index}: {reason}')
|
|
|
|
|
|
|
|
|
|
|
|
class PackageNotInstalled(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class ImageTruncated(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
def retry_sleep(trial):
|
|
|
|
# First trial
|
|
|
|
if trial == 0:
|
|
|
|
pass
|
|
|
|
# Failed once, fast retry
|
|
|
|
elif trial == 1:
|
|
|
|
pass
|
|
|
|
# Failed twice
|
|
|
|
elif trial == 2:
|
|
|
|
time.sleep(1)
|
|
|
|
# Failed more
|
|
|
|
else:
|
|
|
|
time.sleep(RETRY_DELAY)
|
|
|
|
|
|
|
|
|
|
|
|
def handle_adb_error(e):
|
|
|
|
"""
|
|
|
|
Args:
|
|
|
|
e (Exception):
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
bool: If should retry
|
|
|
|
"""
|
|
|
|
text = str(e)
|
|
|
|
if 'not found' in text:
|
|
|
|
# When you call `adb disconnect <serial>`
|
|
|
|
# Or when adb server was killed (low possibility)
|
|
|
|
# AdbError(device '127.0.0.1:59865' not found)
|
|
|
|
logger.error(e)
|
|
|
|
return True
|
|
|
|
elif 'timeout' in text:
|
|
|
|
# AdbTimeout(adb read timeout)
|
|
|
|
logger.error(e)
|
|
|
|
return True
|
|
|
|
elif 'closed' in text:
|
|
|
|
# AdbError(closed)
|
|
|
|
# Usually after AdbTimeout(adb read timeout)
|
|
|
|
# Disconnect and re-connect should fix this.
|
|
|
|
logger.error(e)
|
|
|
|
return True
|
|
|
|
elif 'device offline' in text:
|
|
|
|
# AdbError(device offline)
|
|
|
|
# When a device that has been connected wirelessly is disconnected passively,
|
|
|
|
# it does not disappear from the adb device list,
|
|
|
|
# but will be displayed as offline.
|
|
|
|
# In many cases, such as disconnection and recovery caused by network fluctuations,
|
|
|
|
# or after VMOS reboot when running Alas on a phone,
|
|
|
|
# the device is still available, but it needs to be disconnected and re-connected.
|
|
|
|
logger.error(e)
|
|
|
|
return True
|
|
|
|
elif 'is offline' in text:
|
|
|
|
# RuntimeError: USB device 127.0.0.1:7555 is offline
|
|
|
|
# Raised by uiautomator2 when current adb service is killed by another version of adb service.
|
|
|
|
logger.error(e)
|
|
|
|
return True
|
|
|
|
elif 'unknown host service' in text:
|
|
|
|
# AdbError(unknown host service)
|
|
|
|
# Another version of ADB service started, current ADB service has been killed.
|
|
|
|
# Usually because user opened a Chinese emulator, which uses ADB from the Stone Age.
|
|
|
|
logger.error(e)
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
# AdbError()
|
|
|
|
logger.exception(e)
|
|
|
|
possible_reasons(
|
|
|
|
'If you are using BlueStacks or LD player or WSA, please enable ADB in the settings of your emulator',
|
|
|
|
'Emulator died, please restart emulator',
|
|
|
|
'Serial incorrect, no such device exists or emulator is not running'
|
|
|
|
)
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def get_serial_pair(serial):
|
|
|
|
"""
|
|
|
|
Args:
|
|
|
|
serial (str):
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
str, str: `127.0.0.1:5555+{X}` and `emulator-5554+{X}`, 0 <= X <= 32
|
|
|
|
"""
|
|
|
|
if serial.startswith('127.0.0.1:'):
|
|
|
|
try:
|
|
|
|
port = int(serial[10:])
|
|
|
|
if 5555 <= port <= 5555 + 32:
|
|
|
|
return f'127.0.0.1:{port}', f'emulator-{port - 1}'
|
|
|
|
except (ValueError, IndexError):
|
|
|
|
pass
|
|
|
|
if serial.startswith('emulator-'):
|
|
|
|
try:
|
|
|
|
port = int(serial[9:])
|
|
|
|
if 5554 <= port <= 5554 + 32:
|
|
|
|
return f'127.0.0.1:{port + 1}', f'emulator-{port}'
|
|
|
|
except (ValueError, IndexError):
|
|
|
|
pass
|
|
|
|
|
|
|
|
return None, None
|
|
|
|
|
|
|
|
|
|
|
|
def remove_prefix(s, prefix):
|
|
|
|
"""
|
|
|
|
Remove prefix of a string or bytes like `string.removeprefix(prefix)`, which is on Python3.9+
|
|
|
|
|
|
|
|
Args:
|
|
|
|
s (str, bytes):
|
|
|
|
prefix (str, bytes):
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
str, bytes:
|
|
|
|
"""
|
|
|
|
return s[len(prefix):] if s.startswith(prefix) else s
|
|
|
|
|
|
|
|
|
|
|
|
def remove_suffix(s, suffix):
|
|
|
|
"""
|
|
|
|
Remove suffix of a string or bytes like `string.removesuffix(suffix)`, which is on Python3.9+
|
|
|
|
|
|
|
|
Args:
|
|
|
|
s (str, bytes):
|
|
|
|
suffix (str, bytes):
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
str, bytes:
|
|
|
|
"""
|
|
|
|
return s[:len(suffix)] if s.endswith(suffix) else s
|
|
|
|
|
|
|
|
|
|
|
|
def remove_shell_warning(s):
|
|
|
|
"""
|
|
|
|
Remove warnings from shell
|
|
|
|
|
|
|
|
Args:
|
|
|
|
s (str, bytes):
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
str, bytes:
|
|
|
|
"""
|
|
|
|
# WARNING: linker: [vdso]: unused DT entry: type 0x70000001 arg 0x0\n\x89PNG\r\n\x1a\n\x00\x00\x00\rIH
|
|
|
|
if isinstance(s, bytes):
|
|
|
|
if s.startswith(b'WARNING'):
|
|
|
|
try:
|
|
|
|
s = s.split(b'\n', maxsplit=1)[1]
|
|
|
|
except IndexError:
|
|
|
|
pass
|
|
|
|
return s
|
|
|
|
# return re.sub(b'^WARNING.+\n', b'', s)
|
|
|
|
elif isinstance(s, str):
|
|
|
|
if s.startswith('WARNING'):
|
|
|
|
try:
|
|
|
|
s = s.split('\n', maxsplit=1)[1]
|
|
|
|
except IndexError:
|
|
|
|
pass
|
|
|
|
return s
|
|
|
|
|
|
|
|
|
|
|
|
class IniterNoMinicap(u2.init.Initer):
|
|
|
|
@property
|
|
|
|
def minicap_urls(self):
|
|
|
|
"""
|
|
|
|
Don't install minicap on emulators, return empty urls.
|
|
|
|
|
|
|
|
binary from https://github.com/openatx/stf-binaries
|
|
|
|
only got abi: armeabi-v7a and arm64-v8a
|
|
|
|
"""
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
class Device(u2.Device):
|
|
|
|
def show_float_window(self, show=True):
|
|
|
|
"""
|
|
|
|
Don't show float windows.
|
|
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
# Monkey patch
|
|
|
|
u2.init.Initer = IniterNoMinicap
|
|
|
|
u2.Device = Device
|
|
|
|
|
|
|
|
|
|
|
|
class HierarchyButton:
|
|
|
|
"""
|
|
|
|
Convert UI hierarchy to an object like the Button in Alas.
|
|
|
|
"""
|
|
|
|
_name_regex = re.compile('@.*?=[\'\"](.*?)[\'\"]')
|
|
|
|
|
|
|
|
def __init__(self, hierarchy: etree._Element, xpath: str):
|
|
|
|
self.hierarchy = hierarchy
|
|
|
|
self.xpath = xpath
|
|
|
|
self.nodes = hierarchy.xpath(xpath)
|
|
|
|
|
|
|
|
@cached_property
|
|
|
|
def name(self):
|
|
|
|
res = HierarchyButton._name_regex.findall(self.xpath)
|
|
|
|
if res:
|
|
|
|
return res[0]
|
|
|
|
else:
|
|
|
|
return 'HierarchyButton'
|
|
|
|
|
|
|
|
@cached_property
|
|
|
|
def count(self):
|
|
|
|
return len(self.nodes)
|
|
|
|
|
|
|
|
@cached_property
|
|
|
|
def exist(self):
|
|
|
|
return self.count == 1
|
|
|
|
|
|
|
|
@cached_property
|
|
|
|
def area(self):
|
|
|
|
if self.exist:
|
|
|
|
bounds = self.nodes[0].attrib.get("bounds")
|
|
|
|
lx, ly, rx, ry = map(int, re.findall(r"\d+", bounds))
|
|
|
|
return lx, ly, rx, ry
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
|
|
|
|
@cached_property
|
|
|
|
def button(self):
|
|
|
|
return self.area
|
|
|
|
|
|
|
|
def __bool__(self):
|
|
|
|
return self.exist
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
return self.name
|
|
|
|
|
|
|
|
@cached_property
|
|
|
|
def focused(self):
|
|
|
|
if self.exist:
|
|
|
|
return self.nodes[0].attrib.get("focused").lower() == 'true'
|
|
|
|
else:
|
|
|
|
return False
|