mirror of
https://github.com/LmeSzinc/StarRailCopilot.git
synced 2024-11-16 14:31:16 +00:00
326 lines
10 KiB
Python
326 lines
10 KiB
Python
import typing as t
|
|
from dataclasses import dataclass
|
|
from functools import wraps
|
|
from json.decoder import JSONDecodeError
|
|
from subprocess import list2cmdline
|
|
|
|
import uiautomator2 as u2
|
|
from adbutils.errors import AdbError
|
|
from lxml import etree
|
|
|
|
from module.base.utils import *
|
|
from module.device.connection import Connection
|
|
from module.device.method.utils import (RETRY_TRIES, retry_sleep, handle_adb_error,
|
|
ImageTruncated, PackageNotInstalled, possible_reasons)
|
|
from module.exception import RequestHumanTakeover
|
|
from module.logger import logger
|
|
|
|
|
|
def retry(func):
|
|
@wraps(func)
|
|
def retry_wrapper(self, *args, **kwargs):
|
|
"""
|
|
Args:
|
|
self (Uiautomator2):
|
|
"""
|
|
init = None
|
|
for _ in range(RETRY_TRIES):
|
|
try:
|
|
if callable(init):
|
|
retry_sleep(_)
|
|
init()
|
|
return func(self, *args, **kwargs)
|
|
# Can't handle
|
|
except RequestHumanTakeover:
|
|
break
|
|
# When adb server was killed
|
|
except ConnectionResetError as e:
|
|
logger.error(e)
|
|
|
|
def init():
|
|
self.adb_reconnect()
|
|
# In `device.set_new_command_timeout(604800)`
|
|
# json.decoder.JSONDecodeError: Expecting value: line 1 column 2 (char 1)
|
|
except JSONDecodeError as e:
|
|
logger.error(e)
|
|
|
|
def init():
|
|
self.install_uiautomator2()
|
|
# AdbError
|
|
except AdbError as e:
|
|
if handle_adb_error(e):
|
|
def init():
|
|
self.adb_reconnect()
|
|
else:
|
|
break
|
|
# RuntimeError: USB device 127.0.0.1:5555 is offline
|
|
except RuntimeError as e:
|
|
if handle_adb_error(e):
|
|
def init():
|
|
self.adb_reconnect()
|
|
else:
|
|
break
|
|
# In `assert c.read string(4) == _OKAY`
|
|
# ADB on emulator not enabled
|
|
except AssertionError as e:
|
|
logger.exception(e)
|
|
possible_reasons(
|
|
'If you are using BlueStacks or LD player or WSA, '
|
|
'please enable ADB in the settings of your emulator'
|
|
)
|
|
break
|
|
# Package not installed
|
|
except PackageNotInstalled as e:
|
|
logger.error(e)
|
|
|
|
def init():
|
|
self.detect_package()
|
|
# ImageTruncated
|
|
except ImageTruncated as e:
|
|
logger.error(e)
|
|
|
|
def init():
|
|
pass
|
|
# Unknown
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
|
|
def init():
|
|
pass
|
|
|
|
logger.critical(f'Retry {func.__name__}() failed')
|
|
raise RequestHumanTakeover
|
|
|
|
return retry_wrapper
|
|
|
|
|
|
@dataclass
|
|
class ProcessInfo:
|
|
pid: int
|
|
ppid: int
|
|
thread_count: int
|
|
cmdline: str
|
|
name: str
|
|
|
|
|
|
@dataclass
|
|
class ShellBackgroundResponse:
|
|
success: bool
|
|
pid: int
|
|
description: str
|
|
|
|
|
|
class Uiautomator2(Connection):
|
|
@retry
|
|
def screenshot_uiautomator2(self):
|
|
image = self.u2.screenshot(format='raw')
|
|
image = np.frombuffer(image, np.uint8)
|
|
if image is None:
|
|
raise ImageTruncated('Empty image after reading from buffer')
|
|
|
|
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
|
|
if image is None:
|
|
raise ImageTruncated('Empty image after cv2.imdecode')
|
|
|
|
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
|
if image is None:
|
|
raise ImageTruncated('Empty image after cv2.cvtColor')
|
|
|
|
return image
|
|
|
|
@retry
|
|
def click_uiautomator2(self, x, y):
|
|
self.u2.click(x, y)
|
|
|
|
@retry
|
|
def long_click_uiautomator2(self, x, y, duration=(1, 1.2)):
|
|
self.u2.long_click(x, y, duration=duration)
|
|
|
|
@retry
|
|
def swipe_uiautomator2(self, p1, p2, duration=0.1):
|
|
self.u2.swipe(*p1, *p2, duration=duration)
|
|
|
|
@retry
|
|
def _drag_along(self, path):
|
|
"""Swipe following path.
|
|
|
|
Args:
|
|
path (list): (x, y, sleep)
|
|
|
|
Examples:
|
|
al.drag_along([
|
|
(403, 421, 0.2),
|
|
(821, 326, 0.1),
|
|
(821, 326-10, 0.1),
|
|
(821, 326+10, 0.1),
|
|
(821, 326, 0),
|
|
])
|
|
Equals to:
|
|
al.device.touch.down(403, 421)
|
|
time.sleep(0.2)
|
|
al.device.touch.move(821, 326)
|
|
time.sleep(0.1)
|
|
al.device.touch.move(821, 326-10)
|
|
time.sleep(0.1)
|
|
al.device.touch.move(821, 326+10)
|
|
time.sleep(0.1)
|
|
al.device.touch.up(821, 326)
|
|
"""
|
|
length = len(path)
|
|
for index, data in enumerate(path):
|
|
x, y, second = data
|
|
if index == 0:
|
|
self.u2.touch.down(x, y)
|
|
logger.info(point2str(x, y) + ' down')
|
|
elif index - length == -1:
|
|
self.u2.touch.up(x, y)
|
|
logger.info(point2str(x, y) + ' up')
|
|
else:
|
|
self.u2.touch.move(x, y)
|
|
logger.info(point2str(x, y) + ' move')
|
|
self.sleep(second)
|
|
|
|
def drag_uiautomator2(self, p1, p2, segments=1, shake=(0, 15), point_random=(-10, -10, 10, 10),
|
|
shake_random=(-5, -5, 5, 5), swipe_duration=0.25, shake_duration=0.1):
|
|
"""Drag and shake, like:
|
|
/\
|
|
+-----------+ + +
|
|
\/
|
|
A simple swipe or drag don't work well, because it only has two points.
|
|
Add some way point to make it more like swipe.
|
|
|
|
Args:
|
|
p1 (tuple): Start point, (x, y).
|
|
p2 (tuple): End point, (x, y).
|
|
segments (int):
|
|
shake (tuple): Shake after arrive end point.
|
|
point_random: Add random to start point and end point.
|
|
shake_random: Add random to shake array.
|
|
swipe_duration: Duration between way points.
|
|
shake_duration: Duration between shake points.
|
|
"""
|
|
p1 = np.array(p1) - random_rectangle_point(point_random)
|
|
p2 = np.array(p2) - random_rectangle_point(point_random)
|
|
path = [(x, y, swipe_duration) for x, y in random_line_segments(p1, p2, n=segments, random_range=point_random)]
|
|
path += [
|
|
(*p2 + shake + random_rectangle_point(shake_random), shake_duration),
|
|
(*p2 - shake - random_rectangle_point(shake_random), shake_duration),
|
|
(*p2, shake_duration)
|
|
]
|
|
path = [(int(x), int(y), d) for x, y, d in path]
|
|
self._drag_along(path)
|
|
|
|
@retry
|
|
def app_current_uiautomator2(self):
|
|
"""
|
|
Returns:
|
|
str: Package name.
|
|
"""
|
|
result = self.u2.app_current()
|
|
return result['package']
|
|
|
|
@retry
|
|
def app_start_uiautomator2(self, package_name=None):
|
|
if not package_name:
|
|
package_name = self.package
|
|
try:
|
|
self.u2.app_start(package_name)
|
|
except u2.exceptions.BaseError as e:
|
|
# BaseError: package "com.bilibili.azurlane" not found
|
|
logger.error(e)
|
|
raise PackageNotInstalled(package_name)
|
|
|
|
@retry
|
|
def app_stop_uiautomator2(self, package_name=None):
|
|
if not package_name:
|
|
package_name = self.package
|
|
self.u2.app_stop(package_name)
|
|
|
|
@retry
|
|
def dump_hierarchy_uiautomator2(self) -> etree._Element:
|
|
content = self.u2.dump_hierarchy(compressed=True)
|
|
hierarchy = etree.fromstring(content.encode('utf-8'))
|
|
return hierarchy
|
|
|
|
@retry
|
|
def resolution_uiautomator2(self) -> t.Tuple[int, int]:
|
|
"""
|
|
Faster u2.window_size(), cause that calls `dumpsys display` twice.
|
|
|
|
Returns:
|
|
(width, height)
|
|
"""
|
|
info = self.u2.http.get('/info').json()
|
|
w, h = info['display']['width'], info['display']['height']
|
|
rotation = self.get_orientation()
|
|
if (w > h) != (rotation % 2 == 1):
|
|
w, h = h, w
|
|
return w, h
|
|
|
|
def resolution_check_uiautomator2(self):
|
|
"""
|
|
Alas does not actively check resolution but the width and height of screenshots.
|
|
However, some screenshot methods do not provide device resolution, so check it here.
|
|
|
|
Returns:
|
|
(width, height)
|
|
|
|
Raises:
|
|
RequestHumanTakeover: If resolution is not 1280x720
|
|
"""
|
|
width, height = self.resolution_uiautomator2()
|
|
logger.attr('Screen_size', f'{width}x{height}')
|
|
if width == 1280 and height == 720:
|
|
return (width, height)
|
|
if width == 720 and height == 1280:
|
|
return (width, height)
|
|
|
|
logger.critical(f'Resolution not supported: {width}x{height}')
|
|
logger.critical('Please set emulator resolution to 1280x720')
|
|
raise RequestHumanTakeover
|
|
|
|
@retry
|
|
def proc_list_uiautomator2(self) -> t.List[ProcessInfo]:
|
|
"""
|
|
Get info about current processes.
|
|
"""
|
|
resp = self.u2.http.get("/proc/list", timeout=10)
|
|
resp.raise_for_status()
|
|
result = [
|
|
ProcessInfo(
|
|
pid=proc['pid'],
|
|
ppid=proc['ppid'],
|
|
thread_count=proc['threadCount'],
|
|
cmdline=' '.join(proc['cmdline']) if proc['cmdline'] is not None else '',
|
|
name=proc['name'],
|
|
) for proc in resp.json()
|
|
]
|
|
return result
|
|
|
|
@retry
|
|
def u2_shell_background(self, cmdline, timeout=10) -> ShellBackgroundResponse:
|
|
"""
|
|
Run at background.
|
|
|
|
Note that this function will always return a success response,
|
|
as this is a untested and hidden method in ATX.
|
|
"""
|
|
if isinstance(cmdline, (list, tuple)):
|
|
cmdline = list2cmdline(cmdline)
|
|
elif isinstance(cmdline, str):
|
|
cmdline = cmdline
|
|
else:
|
|
raise TypeError("cmdargs type invalid", type(cmdline))
|
|
|
|
data = dict(command=cmdline, timeout=str(timeout))
|
|
ret = self.u2.http.post("/shell/background", data=data, timeout=timeout + 10)
|
|
ret.raise_for_status()
|
|
|
|
resp = ret.json()
|
|
resp = ShellBackgroundResponse(
|
|
success=bool(resp.get('success', False)),
|
|
pid=resp.get('pid', 0),
|
|
description=resp.get('description', '')
|
|
)
|
|
return resp
|