StarRailCopilot/module/device/platform/platform_windows.py
2024-07-25 01:24:04 +08:00

334 lines
12 KiB
Python

import ctypes
import re
import subprocess
import psutil
from deploy.Windows.utils import DataProcessInfo
from module.base.decorator import run_once
from module.base.timer import Timer
from module.device.connection import AdbDeviceWithStatus
from module.device.platform.platform_base import PlatformBase
from module.device.platform.emulator_windows import Emulator, EmulatorInstance, EmulatorManager
from module.logger import logger
class EmulatorUnknown(Exception):
pass
def get_focused_window():
return ctypes.windll.user32.GetForegroundWindow()
def set_focus_window(hwnd):
ctypes.windll.user32.SetForegroundWindow(hwnd)
def minimize_window(hwnd):
ctypes.windll.user32.ShowWindow(hwnd, 6)
def get_window_title(hwnd):
"""Returns the window title as a string."""
text_len_in_characters = ctypes.windll.user32.GetWindowTextLengthW(hwnd)
string_buffer = ctypes.create_unicode_buffer(
text_len_in_characters + 1) # +1 for the \0 at the end of the null-terminated string.
ctypes.windll.user32.GetWindowTextW(hwnd, string_buffer, text_len_in_characters + 1)
return string_buffer.value
def flash_window(hwnd, flash=True):
ctypes.windll.user32.FlashWindow(hwnd, flash)
class PlatformWindows(PlatformBase, EmulatorManager):
@classmethod
def execute(cls, command):
"""
Args:
command (str):
Returns:
subprocess.Popen:
"""
command = command.replace(r"\\", "/").replace("\\", "/").replace('"', '"')
logger.info(f'Execute: {command}')
return subprocess.Popen(command, close_fds=True) # only work on Windows
@classmethod
def kill_process_by_regex(cls, regex: str) -> int:
"""
Kill processes with cmdline match the given regex.
Args:
regex:
Returns:
int: Number of processes killed
"""
count = 0
for proc in psutil.process_iter():
cmdline = DataProcessInfo(proc=proc, pid=proc.pid).cmdline
if re.search(regex, cmdline):
logger.info(f'Kill emulator: {cmdline}')
proc.kill()
count += 1
return count
def _emulator_start(self, instance: EmulatorInstance):
"""
Start a emulator without error handling
"""
exe = instance.emulator.path
if instance == Emulator.MuMuPlayer:
# NemuPlayer.exe
self.execute(exe)
elif instance == Emulator.MuMuPlayerX:
# NemuPlayer.exe -m nemu-12.0-x64-default
self.execute(f'"{exe}" -m {instance.name}')
elif instance == Emulator.MuMuPlayer12:
# MuMuPlayer.exe -v 0
if instance.MuMuPlayer12_id is None:
logger.warning(f'Cannot get MuMu instance index from name {instance.name}')
self.execute(f'"{exe}" -v {instance.MuMuPlayer12_id}')
elif instance == Emulator.LDPlayerFamily:
# ldconsole.exe launch --index 0
self.execute(f'"{Emulator.single_to_console(exe)}" launch --index {instance.LDPlayer_id}')
elif instance == Emulator.NoxPlayerFamily:
# Nox.exe -clone:Nox_1
self.execute(f'"{exe}" -clone:{instance.name}')
elif instance == Emulator.BlueStacks5:
# HD-Player.exe --instance Pie64
self.execute(f'"{exe}" --instance {instance.name}')
elif instance == Emulator.BlueStacks4:
# BlueStacks\Client\Bluestacks.exe -vmname Android_1
self.execute(f'"{exe}" -vmname {instance.name}')
else:
raise EmulatorUnknown(f'Cannot start an unknown emulator instance: {instance}')
def _emulator_stop(self, instance: EmulatorInstance):
"""
Stop a emulator without error handling
"""
logger.hr('Emulator stop', level=2)
exe = instance.emulator.path
if instance == Emulator.MuMuPlayer:
# MuMu6 does not have multi instance, kill one means kill all
# Has 4 processes
# "C:\Program Files\NemuVbox\Hypervisor\NemuHeadless.exe" --comment nemu-6.0-x64-default --startvm
# "E:\ProgramFiles\MuMu\emulator\nemu\EmulatorShell\NemuPlayer.exe"
# E:\ProgramFiles\MuMu\emulator\nemu\EmulatorShell\NemuService.exe
# "C:\Program Files\NemuVbox\Hypervisor\NemuSVC.exe" -Embedding
self.kill_process_by_regex(
rf'('
rf'NemuHeadless.exe'
rf'|NemuPlayer.exe\"'
rf'|NemuPlayer.exe$'
rf'|NemuService.exe'
rf'|NemuSVC.exe'
rf')'
)
elif instance == Emulator.MuMuPlayerX:
# MuMu X has 3 processes
# "E:\ProgramFiles\MuMu9\emulator\nemu9\EmulatorShell\NemuPlayer.exe" -m nemu-12.0-x64-default -s 0 -l
# "C:\Program Files\Muvm6Vbox\Hypervisor\Muvm6Headless.exe" --comment nemu-12.0-x64-default --startvm xxx
# "C:\Program Files\Muvm6Vbox\Hypervisor\Muvm6SVC.exe" --Embedding
self.kill_process_by_regex(
rf'('
rf'NemuPlayer.exe.*-m {instance.name}'
rf'|Muvm6Headless.exe'
rf'|Muvm6SVC.exe'
rf')'
)
elif instance == Emulator.MuMuPlayer12:
# MuMuManager.exe api -v 1 shutdown_player
if instance.MuMuPlayer12_id is None:
logger.warning(f'Cannot get MuMu instance index from name {instance.name}')
self.execute(f'"{Emulator.single_to_console(exe)}" api -v {instance.MuMuPlayer12_id} shutdown_player')
elif instance == Emulator.LDPlayerFamily:
# ldconsole.exe quit --index 0
self.execute(f'"{Emulator.single_to_console(exe)}" quit --index {instance.LDPlayer_id}')
elif instance == Emulator.NoxPlayerFamily:
# Nox.exe -clone:Nox_1 -quit
self.execute(f'"{exe}" -clone:{instance.name} -quit')
elif instance == Emulator.BlueStacks5:
# BlueStack has 2 processes
# C:\Program Files\BlueStacks_nxt_cn\HD-Player.exe --instance Pie64
# C:\Program Files\BlueStacks_nxt_cn\BstkSVC.exe -Embedding
self.kill_process_by_regex(
rf'('
rf'MuMuVMMHeadless.exe.*--comment {instance.name}'
rf'|MuMuPlayer.exe.*-v {instance.MuMuPlayer12_id}'
rf')'
)
# There is also a shared service, no need to kill it
# "C:\Program Files\MuMuVMMVbox\Hypervisor\MuMuVMMSVC.exe" --Embedding
elif instance == Emulator.NoxPlayerFamily:
# Nox.exe -clone:Nox_1 -quit
self.execute(f'"{exe}" -clone:{instance.name} -quit')
else:
raise EmulatorUnknown(f'Cannot stop an unknown emulator instance: {instance}')
def _emulator_function_wrapper(self, func):
"""
Args:
func (callable): _emulator_start or _emulator_stop
Returns:
bool: If success
"""
try:
func(self.emulator_instance)
return True
except OSError as e:
msg = str(e)
# OSError: [WinError 740] 请求的操作需要提升。
if 'WinError 740' in msg:
logger.error('To start/stop MumuAppPlayer, ALAS needs to be run as administrator')
except EmulatorUnknown as e:
logger.error(e)
except Exception as e:
logger.exception(e)
logger.error(f'Emulator function {func.__name__}() failed')
return False
def emulator_start_watch(self):
"""
Returns:
bool: True if startup completed
False if timeout
"""
logger.hr('Emulator start', level=2)
current_window = get_focused_window()
serial = self.emulator_instance.serial
logger.info(f'Current window: {current_window}')
def adb_connect():
m = self.adb_client.connect(self.serial)
if 'connected' in m:
# Connected to 127.0.0.1:59865
# Already connected to 127.0.0.1:59865
return False
elif '(10061)' in m:
# cannot connect to 127.0.0.1:55555:
# No connection could be made because the target machine actively refused it. (10061)
return False
else:
return True
@run_once
def show_online(m):
logger.info(f'Emulator online: {m}')
@run_once
def show_ping(m):
logger.info(f'Command ping: {m}')
@run_once
def show_package(m):
logger.info(f'Found azurlane packages: {m}')
interval = Timer(0.5).start()
timeout = Timer(180).start()
new_window = 0
while 1:
interval.wait()
interval.reset()
if timeout.reached():
logger.warning(f'Emulator start timeout')
return False
# Check emulator window showing up
# logger.info([get_focused_window(), get_window_title(get_focused_window())])
if current_window != 0 and new_window == 0:
new_window = get_focused_window()
if current_window != new_window:
logger.info(f'New window showing up: {new_window}, focus back')
set_focus_window(current_window)
else:
new_window = 0
# Check device connection
devices = self.list_device().select(serial=serial)
# logger.info(devices)
if devices:
device: AdbDeviceWithStatus = devices.first_or_none()
if device.status == 'device':
# Emulator online
pass
if device.status == 'offline':
self.adb_client.disconnect(serial)
adb_connect()
continue
else:
# Try to connect
adb_connect()
continue
show_online(devices.first_or_none())
# Check command availability
try:
pong = self.adb_shell(['echo', 'pong'])
except Exception as e:
logger.info(e)
continue
show_ping(pong)
# Check azuelane package
packages = self.list_known_packages(show_log=False)
if len(packages):
pass
else:
continue
show_package(packages)
# All check passed
break
if new_window != 0 and new_window != current_window:
logger.info(f'Minimize new window: {new_window}')
minimize_window(new_window)
if current_window:
logger.info(f'De-flash current window: {current_window}')
flash_window(current_window, flash=False)
if new_window:
logger.info(f'Flash new window: {new_window}')
flash_window(new_window, flash=True)
logger.info('Emulator start completed')
return True
def emulator_start(self):
logger.hr('Emulator start', level=1)
for _ in range(3):
# Stop
if not self._emulator_function_wrapper(self._emulator_stop):
return False
# Start
if self._emulator_function_wrapper(self._emulator_start):
# Success
self.emulator_start_watch()
return True
else:
# Failed to start, stop and start again
if self._emulator_function_wrapper(self._emulator_stop):
continue
else:
return False
logger.error('Failed to start emulator 3 times, stopped')
return False
def emulator_stop(self):
logger.hr('Emulator stop', level=1)
return self._emulator_function_wrapper(self._emulator_stop)
if __name__ == '__main__':
self = PlatformWindows('alas')
d = self.emulator_instance
print(d)