StarRailCopilot/module/device/device.py

296 lines
10 KiB
Python

import collections
import itertools
from lxml import etree
from module.device.env import IS_WINDOWS
# Patch pkg_resources before importing adbutils and uiautomator2
from module.device.pkg_resources import get_distribution
# Just avoid being removed by import optimization
_ = get_distribution
from module.base.timer import Timer
from module.device.app_control import AppControl
from module.device.control import Control
from module.device.screenshot import Screenshot
from module.exception import (
EmulatorNotRunningError,
GameNotRunningError,
GameStuckError,
GameTooManyClickError,
RequestHumanTakeover
)
from module.logger import logger
def show_function_call():
"""
INFO 21:07:31.554 │ Function calls:
<string> L1 <module>
spawn.py L116 spawn_main()
spawn.py L129 _main()
process.py L314 _bootstrap()
process.py L108 run()
process_manager.py L149 run_process()
alas.py L285 loop()
alas.py L69 run()
src.py L55 rogue()
rogue.py L36 run()
rogue.py L18 rogue_once()
entry.py L335 rogue_world_enter()
path.py L193 rogue_path_select()
"""
import os
import traceback
stack = traceback.extract_stack()
func_list = []
for row in stack:
filename, line_number, function_name, _ = row
filename = os.path.basename(filename)
# /tasks/character/switch.py:64 character_update()
func_list.append([filename, str(line_number), function_name])
max_filename = max([len(row[0]) for row in func_list])
max_linenum = max([len(row[1]) for row in func_list]) + 1
def format_(file, line, func):
file = file.rjust(max_filename, " ")
line = f'L{line}'.rjust(max_linenum, " ")
if not func.startswith('<'):
func = f'{func}()'
return f'{file} {line} {func}'
func_list = [f'\n{format_(*row)}' for row in func_list]
logger.info('Function calls:' + ''.join(func_list))
class Device(Screenshot, Control, AppControl):
_screen_size_checked = False
detect_record = set()
click_record = collections.deque(maxlen=30)
stuck_timer = Timer(60, count=60).start()
def __init__(self, *args, **kwargs):
for trial in range(4):
try:
super().__init__(*args, **kwargs)
break
except EmulatorNotRunningError:
if trial >= 3:
logger.critical('Failed to start emulator after 3 trial')
raise RequestHumanTakeover
# Try to start emulator
if self.emulator_instance is not None:
self.emulator_start()
else:
logger.critical(
f'No emulator with serial "{self.config.Emulator_Serial}" found, '
f'please set a correct serial'
)
raise RequestHumanTakeover
# Auto-fill emulator info
if IS_WINDOWS and self.config.EmulatorInfo_Emulator == 'auto':
_ = self.emulator_instance
# SRC only, use nemu_ipc if available
available = self.nemu_ipc_available()
logger.attr('nemu_ipc_available', available)
if available:
self.config.override(Emulator_ScreenshotMethod='nemu_ipc')
self.screenshot_interval_set()
self.method_check()
# Auto-select the fastest screenshot method
if not self.config.is_template_config and self.config.Emulator_ScreenshotMethod == 'auto':
self.run_simple_screenshot_benchmark()
# Early init
if self.config.is_actual_task:
if self.config.Emulator_ControlMethod == 'MaaTouch':
self.early_maatouch_init()
if self.config.Emulator_ControlMethod == 'minitouch':
self.early_minitouch_init()
def run_simple_screenshot_benchmark(self):
"""
Perform a screenshot method benchmark, test 3 times on each method.
The fastest one will be set into config.
"""
logger.info('run_simple_screenshot_benchmark')
# Check resolution first
self.resolution_check_uiautomator2()
# Perform benchmark
from module.daemon.benchmark import Benchmark
bench = Benchmark(config=self.config, device=self)
method = bench.run_simple_screenshot_benchmark()
# Set
with self.config.multi_set():
self.config.Emulator_ScreenshotMethod = method
# if method == 'nemu_ipc':
# self.config.Emulator_ControlMethod = 'nemu_ipc'
def method_check(self):
"""
Check combinations of screenshot method and control methods
"""
# nemu_ipc should be together
# if self.config.Emulator_ScreenshotMethod == 'nemu_ipc' and self.config.Emulator_ControlMethod != 'nemu_ipc':
# logger.warning('When using nemu_ipc, both screenshot and control should use nemu_ipc')
# self.config.Emulator_ControlMethod = 'nemu_ipc'
# if self.config.Emulator_ScreenshotMethod != 'nemu_ipc' and self.config.Emulator_ControlMethod == 'nemu_ipc':
# logger.warning('When not using nemu_ipc, both screenshot and control should not use nemu_ipc')
# self.config.Emulator_ControlMethod = 'minitouch'
# Allow Hermit on VMOS only
if self.config.Emulator_ControlMethod == 'Hermit' and not self.is_vmos:
logger.warning('ControlMethod is allowed on VMOS only')
self.config.Emulator_ControlMethod = 'minitouch'
pass
def screenshot(self):
"""
Returns:
np.ndarray:
"""
self.stuck_record_check()
try:
super().screenshot()
except RequestHumanTakeover:
if not self.ascreencap_available:
logger.error('aScreenCap unavailable on current device, fallback to auto')
self.run_simple_screenshot_benchmark()
super().screenshot()
else:
raise
return self.image
def dump_hierarchy(self) -> etree._Element:
self.stuck_record_check()
return super().dump_hierarchy()
def release_during_wait(self):
# Scrcpy server is still sending video stream,
# stop it during wait
if self.config.Emulator_ScreenshotMethod == 'scrcpy':
self._scrcpy_server_stop()
if self.config.Emulator_ScreenshotMethod == 'nemu_ipc':
self.nemu_ipc_release()
def get_orientation(self):
"""
Callbacks when orientation changed.
"""
o = super().get_orientation()
self.on_orientation_change_maatouch()
return o
def stuck_record_add(self, button):
self.detect_record.add(str(button))
def stuck_record_clear(self):
self.detect_record = set()
self.stuck_timer.reset()
def stuck_record_check(self):
"""
Raises:
GameStuckError:
"""
reached = self.stuck_timer.reached()
if not reached:
return False
show_function_call()
logger.warning('Wait too long')
logger.warning(f'Waiting for {self.detect_record}')
self.stuck_record_clear()
if self.app_is_running():
raise GameStuckError(f'Wait too long')
else:
raise GameNotRunningError('Game died')
def handle_control_check(self, button):
self.stuck_record_clear()
self.click_record_add(button)
self.click_record_check()
def click_record_add(self, button):
self.click_record.append(str(button))
def click_record_clear(self):
self.click_record.clear()
def click_record_remove(self, button):
"""
Remove a button from `click_record`
Args:
button (Button):
Returns:
int: Number of button removed
"""
removed = 0
for _ in range(self.click_record.maxlen):
try:
self.click_record.remove(str(button))
removed += 1
except ValueError:
# Value not in queue
break
return removed
def click_record_check(self):
"""
Raises:
GameTooManyClickError:
"""
first15 = itertools.islice(self.click_record, 0, 15)
count = collections.Counter(first15).most_common(2)
if count[0][1] >= 12:
# Allow more clicks in Ruan Mei event
if 'CHOOSE_OPTION_CONFIRM' in self.click_record and 'BLESSING_CONFIRM' in self.click_record:
count = collections.Counter(self.click_record).most_common(2)
if count[0][0] == 'BLESSING_CONFIRM' and count[0][1] < 25:
return
show_function_call()
logger.warning(f'Too many click for a button: {count[0][0]}')
logger.warning(f'History click: {[str(prev) for prev in self.click_record]}')
self.click_record_clear()
raise GameTooManyClickError(f'Too many click for a button: {count[0][0]}')
if len(count) >= 2 and count[0][1] >= 6 and count[1][1] >= 6:
show_function_call()
logger.warning(f'Too many click between 2 buttons: {count[0][0]}, {count[1][0]}')
logger.warning(f'History click: {[str(prev) for prev in self.click_record]}')
self.click_record_clear()
raise GameTooManyClickError(f'Too many click between 2 buttons: {count[0][0]}, {count[1][0]}')
def disable_stuck_detection(self):
"""
Disable stuck detection and its handler. Usually uses in semi auto and debugging.
"""
logger.info('Disable stuck detection')
def empty_function(*arg, **kwargs):
return False
self.click_record_check = empty_function
self.stuck_record_check = empty_function
def app_start(self):
super().app_start()
self.stuck_record_clear()
self.click_record_clear()
def app_stop(self):
super().app_stop()
self.stuck_record_clear()
self.click_record_clear()