mirror of
https://github.com/LmeSzinc/StarRailCopilot.git
synced 2024-11-30 03:16:08 +00:00
189 lines
5.9 KiB
Python
189 lines
5.9 KiB
Python
|
import sys
|
||
|
from collections import deque
|
||
|
|
||
|
from module.base.timer import Timer
|
||
|
from module.device.app_control import AppControl
|
||
|
from module.device.control import Control
|
||
|
from module.device.screenshot import Screenshot
|
||
|
from module.exception import (
|
||
|
EmulatorNotRunningError,
|
||
|
GameNotRunningError,
|
||
|
GameStuckError,
|
||
|
GameTooManyClickError,
|
||
|
RequestHumanTakeover
|
||
|
)
|
||
|
from module.logger import logger
|
||
|
|
||
|
if sys.platform == 'win32':
|
||
|
from module.device.platform.platform_windows import PlatformWindows as Platform
|
||
|
else:
|
||
|
from module.device.platform.platform_base import PlatformBase as Platform
|
||
|
|
||
|
|
||
|
class Device(Screenshot, Control, AppControl, Platform):
|
||
|
_screen_size_checked = False
|
||
|
detect_record = set()
|
||
|
click_record = deque(maxlen=15)
|
||
|
stuck_timer = Timer(60, count=60).start()
|
||
|
|
||
|
def __init__(self, *args, **kwargs):
|
||
|
for _ in range(2):
|
||
|
try:
|
||
|
super().__init__(*args, **kwargs)
|
||
|
break
|
||
|
except EmulatorNotRunningError:
|
||
|
# Try to start emulator
|
||
|
if self.emulator_instance is not None:
|
||
|
self.emulator_start()
|
||
|
else:
|
||
|
logger.critical(
|
||
|
f'No emulator with serial "{self.config.Emulator_Serial}" found, '
|
||
|
f'please set a correct serial'
|
||
|
)
|
||
|
raise
|
||
|
|
||
|
self.screenshot_interval_set()
|
||
|
|
||
|
# Auto-select the fastest screenshot method
|
||
|
if not self.config.is_template_config and self.config.Emulator_ScreenshotMethod == 'auto':
|
||
|
self.run_simple_screenshot_benchmark()
|
||
|
|
||
|
def run_simple_screenshot_benchmark(self):
|
||
|
"""
|
||
|
Perform a screenshot method benchmark, test 3 times on each method.
|
||
|
The fastest one will be set into config.
|
||
|
"""
|
||
|
logger.info('run_simple_screenshot_benchmark')
|
||
|
# Check resolution first
|
||
|
self.resolution_check_uiautomator2()
|
||
|
# Perform benchmark
|
||
|
from module.daemon.benchmark import Benchmark
|
||
|
bench = Benchmark(config=self.config, device=self)
|
||
|
method = bench.run_simple_screenshot_benchmark()
|
||
|
# Set
|
||
|
self.config.Emulator_ScreenshotMethod = method
|
||
|
|
||
|
def screenshot(self):
|
||
|
"""
|
||
|
Returns:
|
||
|
np.ndarray:
|
||
|
"""
|
||
|
self.stuck_record_check()
|
||
|
|
||
|
try:
|
||
|
super().screenshot()
|
||
|
except RequestHumanTakeover:
|
||
|
if not self.ascreencap_available:
|
||
|
logger.error('aScreenCap unavailable on current device, fallback to auto')
|
||
|
self.run_simple_screenshot_benchmark()
|
||
|
super().screenshot()
|
||
|
else:
|
||
|
raise
|
||
|
|
||
|
return self.image
|
||
|
|
||
|
def release_during_wait(self):
|
||
|
# Scrcpy server is still sending video stream,
|
||
|
# stop it during wait
|
||
|
if self.config.Emulator_ScreenshotMethod == 'scrcpy':
|
||
|
self._scrcpy_server_stop()
|
||
|
|
||
|
def stuck_record_add(self, button):
|
||
|
self.detect_record.add(str(button))
|
||
|
|
||
|
def stuck_record_clear(self):
|
||
|
self.detect_record = set()
|
||
|
self.stuck_timer.reset()
|
||
|
|
||
|
def stuck_record_check(self):
|
||
|
"""
|
||
|
Raises:
|
||
|
GameStuckError:
|
||
|
"""
|
||
|
reached = self.stuck_timer.reached()
|
||
|
if not reached:
|
||
|
return False
|
||
|
|
||
|
logger.warning('Wait too long')
|
||
|
logger.warning(f'Waiting for {self.detect_record}')
|
||
|
self.stuck_record_clear()
|
||
|
|
||
|
if self.app_is_running():
|
||
|
raise GameStuckError(f'Wait too long')
|
||
|
else:
|
||
|
raise GameNotRunningError('Game died')
|
||
|
|
||
|
def handle_control_check(self, button):
|
||
|
self.stuck_record_clear()
|
||
|
self.click_record_add(button)
|
||
|
self.click_record_check()
|
||
|
|
||
|
def click_record_add(self, button):
|
||
|
self.click_record.append(str(button))
|
||
|
|
||
|
def click_record_clear(self):
|
||
|
self.click_record.clear()
|
||
|
|
||
|
def click_record_remove(self, button):
|
||
|
"""
|
||
|
Remove a button from `click_record`
|
||
|
|
||
|
Args:
|
||
|
button (Button):
|
||
|
|
||
|
Returns:
|
||
|
int: Number of button removed
|
||
|
"""
|
||
|
removed = 0
|
||
|
for _ in range(self.click_record.maxlen):
|
||
|
try:
|
||
|
self.click_record.remove(str(button))
|
||
|
removed += 1
|
||
|
except ValueError:
|
||
|
# Value not in queue
|
||
|
break
|
||
|
|
||
|
return removed
|
||
|
|
||
|
def click_record_check(self):
|
||
|
"""
|
||
|
Raises:
|
||
|
GameTooManyClickError:
|
||
|
"""
|
||
|
count = {}
|
||
|
for key in self.click_record:
|
||
|
count[key] = count.get(key, 0) + 1
|
||
|
count = sorted(count.items(), key=lambda item: item[1])
|
||
|
if count[0][1] >= 12:
|
||
|
logger.warning(f'Too many click for a button: {count[0][0]}')
|
||
|
logger.warning(f'History click: {[str(prev) for prev in self.click_record]}')
|
||
|
self.click_record_clear()
|
||
|
raise GameTooManyClickError(f'Too many click for a button: {count[0][0]}')
|
||
|
if len(count) >= 2 and count[0][1] >= 6 and count[1][1] >= 6:
|
||
|
logger.warning(f'Too many click between 2 buttons: {count[0][0]}, {count[1][0]}')
|
||
|
logger.warning(f'History click: {[str(prev) for prev in self.click_record]}')
|
||
|
self.click_record_clear()
|
||
|
raise GameTooManyClickError(f'Too many click between 2 buttons: {count[0][0]}, {count[1][0]}')
|
||
|
|
||
|
def disable_stuck_detection(self):
|
||
|
"""
|
||
|
Disable stuck detection and its handler. Usually uses in semi auto and debugging.
|
||
|
"""
|
||
|
logger.info('Disable stuck detection')
|
||
|
|
||
|
def empty_function(*arg, **kwargs):
|
||
|
return False
|
||
|
|
||
|
self.click_record_check = empty_function
|
||
|
self.stuck_record_check = empty_function
|
||
|
|
||
|
def app_start(self):
|
||
|
super().app_start()
|
||
|
self.stuck_record_clear()
|
||
|
self.click_record_clear()
|
||
|
|
||
|
def app_stop(self):
|
||
|
super().app_stop()
|
||
|
self.stuck_record_clear()
|
||
|
self.click_record_clear()
|