mirror of
https://github.com/LmeSzinc/StarRailCopilot.git
synced 2024-11-25 10:01:10 +00:00
211 lines
7.5 KiB
Python
211 lines
7.5 KiB
Python
import os
|
|
from functools import wraps
|
|
|
|
import lz4.block
|
|
from adbutils.errors import AdbError
|
|
|
|
from module.base.utils import *
|
|
from module.device.connection import Connection
|
|
from module.device.method.utils import (ImageTruncated, RETRY_TRIES, handle_adb_error, handle_unknown_host_service,
|
|
retry_sleep)
|
|
from module.exception import RequestHumanTakeover, ScriptError
|
|
from module.logger import logger
|
|
|
|
|
|
class AscreencapError(Exception):
|
|
pass
|
|
|
|
|
|
def retry(func):
|
|
@wraps(func)
|
|
def retry_wrapper(self, *args, **kwargs):
|
|
"""
|
|
Args:
|
|
self (AScreenCap):
|
|
"""
|
|
init = None
|
|
for _ in range(RETRY_TRIES):
|
|
try:
|
|
if callable(init):
|
|
retry_sleep(_)
|
|
init()
|
|
return func(self, *args, **kwargs)
|
|
# Can't handle
|
|
except RequestHumanTakeover:
|
|
break
|
|
# When adb server was killed
|
|
except ConnectionResetError as e:
|
|
logger.error(e)
|
|
|
|
def init():
|
|
self.adb_reconnect()
|
|
# When ascreencap is not installed
|
|
except AscreencapError as e:
|
|
logger.error(e)
|
|
|
|
def init():
|
|
self.ascreencap_init()
|
|
# AdbError
|
|
except AdbError as e:
|
|
if handle_adb_error(e):
|
|
def init():
|
|
self.adb_reconnect()
|
|
elif handle_unknown_host_service(e):
|
|
def init():
|
|
self.adb_start_server()
|
|
self.adb_reconnect()
|
|
else:
|
|
break
|
|
# ImageTruncated
|
|
except ImageTruncated as e:
|
|
logger.error(e)
|
|
|
|
def init():
|
|
pass
|
|
# Unknown
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
|
|
def init():
|
|
pass
|
|
|
|
logger.critical(f'Retry {func.__name__}() failed')
|
|
raise RequestHumanTakeover
|
|
|
|
return retry_wrapper
|
|
|
|
|
|
class AScreenCap(Connection):
|
|
__screenshot_method = [0, 1, 2]
|
|
__screenshot_method_fixed = [0, 1, 2]
|
|
__bytepointer = 0
|
|
ascreencap_available = True
|
|
|
|
def ascreencap_init(self):
|
|
logger.hr('aScreenCap init')
|
|
self.__bytepointer = 0
|
|
self.ascreencap_available = True
|
|
|
|
arc = self.cpu_abi
|
|
sdk = self.sdk_ver
|
|
logger.info(f'cpu_arc: {arc}, sdk_ver: {sdk}')
|
|
|
|
if sdk in range(21, 26):
|
|
ver = "Android_5.x-7.x"
|
|
elif sdk in range(26, 28):
|
|
ver = "Android_8.x"
|
|
elif sdk == 28:
|
|
ver = "Android_9.x"
|
|
else:
|
|
ver = "0"
|
|
filepath = os.path.join(self.config.ASCREENCAP_FILEPATH_LOCAL, ver, arc, 'ascreencap')
|
|
if not os.path.exists(filepath):
|
|
self.ascreencap_available = False
|
|
logger.error('No suitable version of aScreenCap lib available for this device, '
|
|
'please use other screenshot methods instead')
|
|
raise RequestHumanTakeover
|
|
|
|
logger.info(f'pushing {filepath}')
|
|
self.adb_push(filepath, self.config.ASCREENCAP_FILEPATH_REMOTE)
|
|
|
|
logger.info(f'chmod 0777 {self.config.ASCREENCAP_FILEPATH_REMOTE}')
|
|
self.adb_shell(['chmod', '0777', self.config.ASCREENCAP_FILEPATH_REMOTE])
|
|
|
|
def uninstall_ascreencap(self):
|
|
logger.info('Removing ascreencap')
|
|
self.adb_shell(['rm', self.config.ASCREENCAP_FILEPATH_REMOTE])
|
|
|
|
def _ascreencap_reposition_byte_pointer(self, byte_array):
|
|
"""Method to return the sanitized version of ascreencap stdout for devices
|
|
that suffers from linker warnings. The correct pointer location will be saved
|
|
for subsequent screen refreshes
|
|
"""
|
|
while byte_array[self.__bytepointer:self.__bytepointer + 4] != b'BMZ1':
|
|
self.__bytepointer += 1
|
|
if self.__bytepointer >= len(byte_array):
|
|
text = 'Repositioning byte pointer failed, corrupted aScreenCap data received'
|
|
logger.warning(text)
|
|
if len(byte_array) < 500:
|
|
logger.warning(f'Unexpected screenshot: {byte_array}')
|
|
raise AscreencapError(text)
|
|
return byte_array[self.__bytepointer:]
|
|
|
|
def __load_screenshot(self, screenshot, method):
|
|
if method == 0:
|
|
return screenshot
|
|
elif method == 1:
|
|
return screenshot.replace(b'\r\n', b'\n')
|
|
elif method == 2:
|
|
return screenshot.replace(b'\r\r\n', b'\n')
|
|
else:
|
|
raise ScriptError(f'Unknown method to load screenshots: {method}')
|
|
|
|
def __uncompress(self, screenshot):
|
|
raw_compressed_data = self._ascreencap_reposition_byte_pointer(screenshot)
|
|
|
|
# See headers in:
|
|
# https://github.com/ClnViewer/Android-fast-screen-capture#streamimage-compressed---header-format-using
|
|
compressed_data_header = np.frombuffer(raw_compressed_data[0:20], dtype=np.uint32)
|
|
if compressed_data_header[0] != 828001602:
|
|
compressed_data_header = compressed_data_header.byteswap()
|
|
if compressed_data_header[0] != 828001602:
|
|
text = f'aScreenCap header verification failure, corrupted image received. ' \
|
|
f'HEADER IN HEX = {compressed_data_header.tobytes().hex()}'
|
|
logger.warning(text)
|
|
raise AscreencapError(text)
|
|
|
|
_, uncompressed_size, _, width, height = compressed_data_header
|
|
channel = 3
|
|
data = lz4.block.decompress(raw_compressed_data[20:], uncompressed_size=uncompressed_size)
|
|
|
|
image = np.frombuffer(data, dtype=np.uint8)
|
|
if image is None:
|
|
raise ImageTruncated('Empty image after reading from buffer')
|
|
|
|
# Equivalent to cv2.imdecode()
|
|
try:
|
|
image = image[-int(width * height * channel):].reshape(height, width, channel)
|
|
except ValueError as e:
|
|
# ValueError: cannot reshape array of size 0 into shape (720,1280,4)
|
|
raise ImageTruncated(str(e))
|
|
|
|
cv2.flip(image, 0, dst=image)
|
|
if image is None:
|
|
raise ImageTruncated('Empty image after cv2.flip')
|
|
|
|
cv2.cvtColor(image, cv2.COLOR_BGR2RGB, dst=image)
|
|
if image is None:
|
|
raise ImageTruncated('Empty image after cv2.cvtColor')
|
|
|
|
return image
|
|
|
|
def __process_screenshot(self, screenshot):
|
|
for method in self.__screenshot_method_fixed:
|
|
try:
|
|
result = self.__load_screenshot(screenshot, method=method)
|
|
result = self.__uncompress(result)
|
|
self.__screenshot_method_fixed = [method] + self.__screenshot_method
|
|
return result
|
|
except lz4.block.LZ4BlockError:
|
|
self.__bytepointer = 0
|
|
continue
|
|
|
|
self.__screenshot_method_fixed = self.__screenshot_method
|
|
if len(screenshot) < 500:
|
|
logger.warning(f'Unexpected screenshot: {screenshot}')
|
|
raise OSError(f'cannot load screenshot')
|
|
|
|
@retry
|
|
def screenshot_ascreencap(self):
|
|
content = self.adb_shell([self.config.ASCREENCAP_FILEPATH_REMOTE, '--pack', '2', '--stdout'], stream=True)
|
|
|
|
return self.__process_screenshot(content)
|
|
|
|
@retry
|
|
def screenshot_ascreencap_nc(self):
|
|
data = self.adb_shell_nc([self.config.ASCREENCAP_FILEPATH_REMOTE, '--pack', '2', '--stdout'])
|
|
if len(data) < 500:
|
|
logger.warning(f'Unexpected screenshot: {data}')
|
|
|
|
return self.__uncompress(data)
|