StarRailCopilot/module/device/method/adb.py
2024-04-14 19:11:32 +08:00

320 lines
10 KiB
Python

import re
from functools import wraps
import cv2
import numpy as np
import time
from adbutils.errors import AdbError
from lxml import etree
from module.base.decorator import Config
from module.device.connection import Connection
from module.device.method.utils import (RETRY_TRIES, retry_sleep, remove_prefix, handle_adb_error,
ImageTruncated, PackageNotInstalled)
from module.exception import RequestHumanTakeover, ScriptError
from module.logger import logger
def retry(func):
@wraps(func)
def retry_wrapper(self, *args, **kwargs):
"""
Args:
self (Adb):
"""
init = None
for _ in range(RETRY_TRIES):
try:
if callable(init):
retry_sleep(_)
init()
return func(self, *args, **kwargs)
# Can't handle
except RequestHumanTakeover:
break
# When adb server was killed
except ConnectionResetError as e:
logger.error(e)
def init():
self.adb_reconnect()
# AdbError
except AdbError as e:
if handle_adb_error(e):
def init():
self.adb_reconnect()
else:
break
# Package not installed
except PackageNotInstalled as e:
logger.error(e)
def init():
self.detect_package()
# ImageTruncated
except ImageTruncated as e:
logger.error(e)
def init():
pass
# Unknown
except Exception as e:
logger.exception(e)
def init():
pass
logger.critical(f'Retry {func.__name__}() failed')
raise RequestHumanTakeover
return retry_wrapper
def load_screencap(data):
"""
Args:
data: Raw data from `screencap`
Returns:
np.ndarray:
"""
# Load data
header = np.frombuffer(data[0:12], dtype=np.uint32)
channel = 4 # screencap sends an RGBA image
width, height, _ = header # Usually to be 1280, 720, 1
image = np.frombuffer(data, dtype=np.uint8)
if image is None:
raise ImageTruncated('Empty image after reading from buffer')
try:
image = image[-int(width * height * channel):].reshape(height, width, channel)
except ValueError as e:
# ValueError: cannot reshape array of size 0 into shape (720,1280,4)
raise ImageTruncated(str(e))
image = cv2.cvtColor(image, cv2.COLOR_BGRA2BGR)
if image is None:
raise ImageTruncated('Empty image after cv2.cvtColor')
return image
class Adb(Connection):
__screenshot_method = [0, 1, 2]
__screenshot_method_fixed = [0, 1, 2]
@staticmethod
def __load_screenshot(screenshot, method):
if method == 0:
pass
elif method == 1:
screenshot = screenshot.replace(b'\r\n', b'\n')
elif method == 2:
screenshot = screenshot.replace(b'\r\r\n', b'\n')
else:
raise ScriptError(f'Unknown method to load screenshots: {method}')
# fix compatibility issues for adb screencap decode problem when the data is from vmos pro
# When use adb screencap for a screenshot from vmos pro, there would be a header more than that from emulator
# which would cause image decode problem. So i check and remove the header there.
screenshot = remove_prefix(screenshot, b'long long=8 fun*=10\n')
image = np.frombuffer(screenshot, np.uint8)
if image is None:
raise ImageTruncated('Empty image after reading from buffer')
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
if image is None:
raise ImageTruncated('Empty image after cv2.imdecode')
cv2.cvtColor(image, cv2.COLOR_BGR2RGB, dst=image)
if image is None:
raise ImageTruncated('Empty image after cv2.cvtColor')
return image
def __process_screenshot(self, screenshot):
for method in self.__screenshot_method_fixed:
try:
result = self.__load_screenshot(screenshot, method=method)
self.__screenshot_method_fixed = [method] + self.__screenshot_method
return result
except (OSError, ImageTruncated):
continue
self.__screenshot_method_fixed = self.__screenshot_method
if len(screenshot) < 500:
logger.warning(f'Unexpected screenshot: {screenshot}')
raise OSError(f'cannot load screenshot')
@retry
@Config.when(DEVICE_OVER_HTTP=False)
def screenshot_adb(self):
data = self.adb_shell(['screencap', '-p'], stream=True)
if len(data) < 500:
logger.warning(f'Unexpected screenshot: {data}')
return self.__process_screenshot(data)
@retry
@Config.when(DEVICE_OVER_HTTP=True)
def screenshot_adb(self):
data = self.adb_shell(['screencap'], stream=True)
if len(data) < 500:
logger.warning(f'Unexpected screenshot: {data}')
return load_screencap(data)
@retry
def screenshot_adb_nc(self):
data = self.adb_shell_nc(['screencap'])
if len(data) < 500:
logger.warning(f'Unexpected screenshot: {data}')
return load_screencap(data)
@retry
def click_adb(self, x, y):
start = time.time()
self.adb_shell(['input', 'tap', x, y])
if time.time() - start <= 0.05:
self.sleep(0.05)
@retry
def swipe_adb(self, p1, p2, duration=0.1):
duration = int(duration * 1000)
self.adb_shell(['input', 'swipe', *p1, *p2, duration])
@retry
def app_current_adb(self):
"""
Copied from uiautomator2
Returns:
str: Package name.
Raises:
OSError
For developer:
Function reset_uiautomator need this function, so can't use jsonrpc here.
"""
# Related issue: https://github.com/openatx/uiautomator2/issues/200
# $ adb shell dumpsys window windows
# Example output:
# mCurrentFocus=Window{41b37570 u0 com.incall.apps.launcher/com.incall.apps.launcher.Launcher}
# mFocusedApp=AppWindowToken{422df168 token=Token{422def98 ActivityRecord{422dee38 u0 com.example/.UI.play.PlayActivity t14}}}
# Regexp
# r'mFocusedApp=.*ActivityRecord{\w+ \w+ (?P<package>.*)/(?P<activity>.*) .*'
# r'mCurrentFocus=Window{\w+ \w+ (?P<package>.*)/(?P<activity>.*)\}')
_focusedRE = re.compile(
r'mCurrentFocus=Window{.*\s+(?P<package>[^\s]+)/(?P<activity>[^\s]+)\}'
)
m = _focusedRE.search(self.adb_shell(['dumpsys', 'window', 'windows']))
if m:
return m.group('package')
# try: adb shell dumpsys activity top
_activityRE = re.compile(
r'ACTIVITY (?P<package>[^\s]+)/(?P<activity>[^/\s]+) \w+ pid=(?P<pid>\d+)'
)
output = self.adb_shell(['dumpsys', 'activity', 'top'])
ms = _activityRE.finditer(output)
ret = None
for m in ms:
ret = m.group('package')
if ret: # get last result
return ret
raise OSError("Couldn't get focused app")
@retry
def app_start_adb(self, package_name=None, allow_failure=False):
"""
Args:
package_name (str):
allow_failure (bool):
Returns:
bool: If success to start
"""
if not package_name:
package_name = self.package
result = self.adb_shell([
'monkey', '-p', package_name, '-c',
'android.intent.category.LAUNCHER', '--pct-syskeys', '0', '1'
])
if 'No activities found' in result:
# ** No activities found to run, monkey aborted.
if allow_failure:
return False
else:
logger.error(result)
raise PackageNotInstalled(package_name)
elif 'inaccessible' in result:
# /system/bin/sh: monkey: inaccessible or not found
pass
else:
# Events injected: 1
# ## Network stats: elapsed time=4ms (0ms mobile, 0ms wifi, 4ms not connected)
return True
result = self.adb_shell(['dumpsys', 'package', package_name])
res = re.search(r'android.intent.action.MAIN:\s+\w+ ([\w.\/]+) filter \w+\s+'
r'.*\s+Category: "android.intent.category.LAUNCHER"',
result)
if res:
activity_name = res.group(1)
else:
if allow_failure:
return False
else:
logger.error(result)
raise PackageNotInstalled(package_name)
self.adb_shell(['am', 'start', '-a', 'android.intent.action.MAIN', '-c',
'android.intent.category.LAUNCHER', '-n', activity_name])
@retry
def app_stop_adb(self, package_name=None):
""" Stop one application: am force-stop"""
if not package_name:
package_name = self.package
self.adb_shell(['am', 'force-stop', package_name])
@retry
def dump_hierarchy_adb(self, temp: str = '/data/local/tmp/hierarchy.xml') -> etree._Element:
"""
Args:
temp (str): Temp file store on emulator.
Returns:
etree._Element:
"""
# Remove existing file
# self.adb_shell(['rm', '/data/local/tmp/hierarchy.xml'])
# Dump hierarchy
for _ in range(2):
response = self.adb_shell(['uiautomator', 'dump', '--compressed', temp])
if 'hierchary' in response:
# UI hierchary dumped to: /data/local/tmp/hierarchy.xml
break
else:
# <None>
# Must kill uiautomator2
self.app_stop_adb('com.github.uiautomator')
self.app_stop_adb('com.github.uiautomator.test')
continue
# Read from device
content = b''
for chunk in self.adb.sync.iter_content(temp):
if chunk:
content += chunk
else:
break
# Parse with lxml
hierarchy = etree.fromstring(content)
return hierarchy