StarRailCopilot/module/device/method/uiautomator_2.py
2024-04-14 19:11:32 +08:00

327 lines
10 KiB
Python

import typing as t
from dataclasses import dataclass
from functools import wraps
from json.decoder import JSONDecodeError
from subprocess import list2cmdline
import uiautomator2 as u2
from adbutils.errors import AdbError
from lxml import etree
from module.base.utils import *
from module.device.connection import Connection
from module.device.method.utils import (RETRY_TRIES, retry_sleep, handle_adb_error,
ImageTruncated, PackageNotInstalled, possible_reasons)
from module.exception import RequestHumanTakeover
from module.logger import logger
def retry(func):
@wraps(func)
def retry_wrapper(self, *args, **kwargs):
"""
Args:
self (Uiautomator2):
"""
init = None
for _ in range(RETRY_TRIES):
try:
if callable(init):
retry_sleep(_)
init()
return func(self, *args, **kwargs)
# Can't handle
except RequestHumanTakeover:
break
# When adb server was killed
except ConnectionResetError as e:
logger.error(e)
def init():
self.adb_reconnect()
# In `device.set_new_command_timeout(604800)`
# json.decoder.JSONDecodeError: Expecting value: line 1 column 2 (char 1)
except JSONDecodeError as e:
logger.error(e)
def init():
self.install_uiautomator2()
# AdbError
except AdbError as e:
if handle_adb_error(e):
def init():
self.adb_reconnect()
else:
break
# RuntimeError: USB device 127.0.0.1:5555 is offline
except RuntimeError as e:
if handle_adb_error(e):
def init():
self.adb_reconnect()
else:
break
# In `assert c.read string(4) == _OKAY`
# ADB on emulator not enabled
except AssertionError as e:
logger.exception(e)
possible_reasons(
'If you are using BlueStacks or LD player or WSA, '
'please enable ADB in the settings of your emulator'
)
break
# Package not installed
except PackageNotInstalled as e:
logger.error(e)
def init():
self.detect_package()
# ImageTruncated
except ImageTruncated as e:
logger.error(e)
def init():
pass
# Unknown
except Exception as e:
logger.exception(e)
def init():
pass
logger.critical(f'Retry {func.__name__}() failed')
raise RequestHumanTakeover
return retry_wrapper
@dataclass
class ProcessInfo:
pid: int
ppid: int
thread_count: int
cmdline: str
name: str
@dataclass
class ShellBackgroundResponse:
success: bool
pid: int
description: str
class Uiautomator2(Connection):
@retry
def screenshot_uiautomator2(self):
image = self.u2.screenshot(format='raw')
image = np.frombuffer(image, np.uint8)
if image is None:
raise ImageTruncated('Empty image after reading from buffer')
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
if image is None:
raise ImageTruncated('Empty image after cv2.imdecode')
cv2.cvtColor(image, cv2.COLOR_BGR2RGB, dst=image)
if image is None:
raise ImageTruncated('Empty image after cv2.cvtColor')
return image
@retry
def click_uiautomator2(self, x, y):
self.u2.click(x, y)
@retry
def long_click_uiautomator2(self, x, y, duration=(1, 1.2)):
self.u2.long_click(x, y, duration=duration)
@retry
def swipe_uiautomator2(self, p1, p2, duration=0.1):
self.u2.swipe(*p1, *p2, duration=duration)
@retry
def _drag_along(self, path):
"""Swipe following path.
Args:
path (list): (x, y, sleep)
Examples:
al.drag_along([
(403, 421, 0.2),
(821, 326, 0.1),
(821, 326-10, 0.1),
(821, 326+10, 0.1),
(821, 326, 0),
])
Equals to:
al.device.touch.down(403, 421)
time.sleep(0.2)
al.device.touch.move(821, 326)
time.sleep(0.1)
al.device.touch.move(821, 326-10)
time.sleep(0.1)
al.device.touch.move(821, 326+10)
time.sleep(0.1)
al.device.touch.up(821, 326)
"""
length = len(path)
for index, data in enumerate(path):
x, y, second = data
if index == 0:
self.u2.touch.down(x, y)
logger.info(point2str(x, y) + ' down')
elif index - length == -1:
self.u2.touch.up(x, y)
logger.info(point2str(x, y) + ' up')
else:
self.u2.touch.move(x, y)
logger.info(point2str(x, y) + ' move')
self.sleep(second)
def drag_uiautomator2(self, p1, p2, segments=1, shake=(0, 15), point_random=(-10, -10, 10, 10),
shake_random=(-5, -5, 5, 5), swipe_duration=0.25, shake_duration=0.1):
"""Drag and shake, like:
/\
+-----------+ + +
\/
A simple swipe or drag don't work well, because it only has two points.
Add some way point to make it more like swipe.
Args:
p1 (tuple): Start point, (x, y).
p2 (tuple): End point, (x, y).
segments (int):
shake (tuple): Shake after arrive end point.
point_random: Add random to start point and end point.
shake_random: Add random to shake array.
swipe_duration: Duration between way points.
shake_duration: Duration between shake points.
"""
p1 = np.array(p1) - random_rectangle_point(point_random)
p2 = np.array(p2) - random_rectangle_point(point_random)
path = [(x, y, swipe_duration) for x, y in random_line_segments(p1, p2, n=segments, random_range=point_random)]
path += [
(*p2 + shake + random_rectangle_point(shake_random), shake_duration),
(*p2 - shake - random_rectangle_point(shake_random), shake_duration),
(*p2, shake_duration)
]
path = [(int(x), int(y), d) for x, y, d in path]
self._drag_along(path)
@retry
def app_current_uiautomator2(self):
"""
Returns:
str: Package name.
"""
result = self.u2.app_current()
return result['package']
@retry
def app_start_uiautomator2(self, package_name=None):
if not package_name:
package_name = self.package
try:
self.u2.app_start(package_name)
except u2.exceptions.BaseError as e:
# BaseError: package "com.bilibili.azurlane" not found
logger.error(e)
raise PackageNotInstalled(package_name)
@retry
def app_stop_uiautomator2(self, package_name=None):
if not package_name:
package_name = self.package
self.u2.app_stop(package_name)
@retry
def dump_hierarchy_uiautomator2(self) -> etree._Element:
content = self.u2.dump_hierarchy(compressed=True)
hierarchy = etree.fromstring(content.encode('utf-8'))
return hierarchy
@retry
def resolution_uiautomator2(self, cal_rotation=True) -> t.Tuple[int, int]:
"""
Faster u2.window_size(), cause that calls `dumpsys display` twice.
Returns:
(width, height)
"""
info = self.u2.http.get('/info').json()
w, h = info['display']['width'], info['display']['height']
if cal_rotation:
rotation = self.get_orientation()
if (w > h) != (rotation % 2 == 1):
w, h = h, w
return w, h
def resolution_check_uiautomator2(self):
"""
Alas does not actively check resolution but the width and height of screenshots.
However, some screenshot methods do not provide device resolution, so check it here.
Returns:
(width, height)
Raises:
RequestHumanTakeover: If resolution is not 1280x720
"""
width, height = self.resolution_uiautomator2()
logger.attr('Screen_size', f'{width}x{height}')
if width == 1280 and height == 720:
return (width, height)
if width == 720 and height == 1280:
return (width, height)
logger.critical(f'Resolution not supported: {width}x{height}')
logger.critical('Please set emulator resolution to 1280x720')
raise RequestHumanTakeover
@retry
def proc_list_uiautomator2(self) -> t.List[ProcessInfo]:
"""
Get info about current processes.
"""
resp = self.u2.http.get("/proc/list", timeout=10)
resp.raise_for_status()
result = [
ProcessInfo(
pid=proc['pid'],
ppid=proc['ppid'],
thread_count=proc['threadCount'],
cmdline=' '.join(proc['cmdline']) if proc['cmdline'] is not None else '',
name=proc['name'],
) for proc in resp.json()
]
return result
@retry
def u2_shell_background(self, cmdline, timeout=10) -> ShellBackgroundResponse:
"""
Run at background.
Note that this function will always return a success response,
as this is a untested and hidden method in ATX.
"""
if isinstance(cmdline, (list, tuple)):
cmdline = list2cmdline(cmdline)
elif isinstance(cmdline, str):
cmdline = cmdline
else:
raise TypeError("cmdargs type invalid", type(cmdline))
data = dict(command=cmdline, timeout=str(timeout))
ret = self.u2.http.post("/shell/background", data=data, timeout=timeout + 10)
ret.raise_for_status()
resp = ret.json()
resp = ShellBackgroundResponse(
success=bool(resp.get('success', False)),
pid=resp.get('pid', 0),
description=resp.get('description', '')
)
return resp