import ctypes import re import subprocess import psutil from deploy.Windows.utils import DataProcessInfo from module.base.decorator import run_once from module.base.timer import Timer from module.device.connection import AdbDeviceWithStatus from module.device.platform.platform_base import PlatformBase from module.device.platform.emulator_windows import Emulator, EmulatorInstance, EmulatorManager from module.logger import logger class EmulatorUnknown(Exception): pass def get_focused_window(): return ctypes.windll.user32.GetForegroundWindow() def set_focus_window(hwnd): ctypes.windll.user32.SetForegroundWindow(hwnd) def minimize_window(hwnd): ctypes.windll.user32.ShowWindow(hwnd, 6) def get_window_title(hwnd): """Returns the window title as a string.""" text_len_in_characters = ctypes.windll.user32.GetWindowTextLengthW(hwnd) string_buffer = ctypes.create_unicode_buffer( text_len_in_characters + 1) # +1 for the \0 at the end of the null-terminated string. ctypes.windll.user32.GetWindowTextW(hwnd, string_buffer, text_len_in_characters + 1) return string_buffer.value def flash_window(hwnd, flash=True): ctypes.windll.user32.FlashWindow(hwnd, flash) class PlatformWindows(PlatformBase, EmulatorManager): @classmethod def execute(cls, command): """ Args: command (str): Returns: subprocess.Popen: """ command = command.replace(r"\\", "/").replace("\\", "/").replace('"', '"') logger.info(f'Execute: {command}') return subprocess.Popen(command, close_fds=True) # only work on Windows @classmethod def kill_process_by_regex(cls, regex: str) -> int: """ Kill processes with cmdline match the given regex. Args: regex: Returns: int: Number of processes killed """ count = 0 for proc in psutil.process_iter(): cmdline = DataProcessInfo(proc=proc, pid=proc.pid).cmdline if re.search(regex, cmdline): logger.info(f'Kill emulator: {cmdline}') proc.kill() count += 1 return count def _emulator_start(self, instance: EmulatorInstance): """ Start a emulator without error handling """ exe = instance.emulator.path if instance == Emulator.MuMuPlayer: # NemuPlayer.exe self.execute(exe) elif instance == Emulator.MuMuPlayerX: # NemuPlayer.exe -m nemu-12.0-x64-default self.execute(f'"{exe}" -m {instance.name}') elif instance == Emulator.MuMuPlayer12: # MuMuPlayer.exe -v 0 if instance.MuMuPlayer12_id is None: logger.warning(f'Cannot get MuMu instance index from name {instance.name}') self.execute(f'"{exe}" -v {instance.MuMuPlayer12_id}') elif instance == Emulator.NoxPlayerFamily: # Nox.exe -clone:Nox_1 self.execute(f'"{exe}" -clone:{instance.name}') elif instance == Emulator.BlueStacks5: # HD-Player.exe -instance Pie64 self.execute(f'"{exe}" -instance {instance.name}') elif instance == Emulator.BlueStacks4: # BlueStacks\Client\Bluestacks.exe -vmname Android_1 self.execute(f'"{exe}" -vmname {instance.name}') else: raise EmulatorUnknown(f'Cannot start an unknown emulator instance: {instance}') def _emulator_stop(self, instance: EmulatorInstance): """ Stop a emulator without error handling """ logger.hr('Emulator stop', level=2) exe = instance.emulator.path if instance == Emulator.MuMuPlayer: # MuMu6 does not have multi instance, kill one means kill all # Has 4 processes # "C:\Program Files\NemuVbox\Hypervisor\NemuHeadless.exe" --comment nemu-6.0-x64-default --startvm # "E:\ProgramFiles\MuMu\emulator\nemu\EmulatorShell\NemuPlayer.exe" # E:\ProgramFiles\MuMu\emulator\nemu\EmulatorShell\NemuService.exe # "C:\Program Files\NemuVbox\Hypervisor\NemuSVC.exe" -Embedding self.kill_process_by_regex( rf'(' rf'NemuHeadless.exe' rf'|NemuPlayer.exe\"' rf'|NemuPlayer.exe$' rf'|NemuService.exe' rf'|NemuSVC.exe' rf')' ) elif instance == Emulator.MuMuPlayerX: # MuMu X has 3 processes # "E:\ProgramFiles\MuMu9\emulator\nemu9\EmulatorShell\NemuPlayer.exe" -m nemu-12.0-x64-default -s 0 -l # "C:\Program Files\Muvm6Vbox\Hypervisor\Muvm6Headless.exe" --comment nemu-12.0-x64-default --startvm xxx # "C:\Program Files\Muvm6Vbox\Hypervisor\Muvm6SVC.exe" --Embedding self.kill_process_by_regex( rf'(' rf'NemuPlayer.exe.*-m {instance.name}' rf'|Muvm6Headless.exe' rf'|Muvm6SVC.exe' rf')' ) elif instance == Emulator.MuMuPlayer12: # MuMu 12 has 2 processes: # E:\ProgramFiles\Netease\MuMuPlayer-12.0\shell\MuMuPlayer.exe -v 0 # "C:\Program Files\MuMuVMMVbox\Hypervisor\MuMuVMMHeadless.exe" --comment MuMuPlayer-12.0-0 --startvm xxx if instance.MuMuPlayer12_id is None: logger.warning(f'Cannot get MuMu instance index from name {instance.name}') self.kill_process_by_regex( rf'(' rf'MuMuVMMHeadless.exe.*--comment {instance.name}' rf'|MuMuPlayer.exe.*-v {instance.MuMuPlayer12_id}' rf')' ) # There is also a shared service, no need to kill it # "C:\Program Files\MuMuVMMVbox\Hypervisor\MuMuVMMSVC.exe" --Embedding elif instance == Emulator.NoxPlayerFamily: # Nox.exe -clone:Nox_1 -quit self.execute(f'"{exe}" -clone:{instance.name} -quit') else: raise EmulatorUnknown(f'Cannot stop an unknown emulator instance: {instance}') def _emulator_function_wrapper(self, func): """ Args: func (callable): _emulator_start or _emulator_stop Returns: bool: If success """ try: func(self.emulator_instance) return True except OSError as e: msg = str(e) # OSError: [WinError 740] 请求的操作需要提升。 if 'WinError 740' in msg: logger.error('To start/stop MumuAppPlayer, ALAS needs to be run as administrator') except EmulatorUnknown as e: logger.error(e) except Exception as e: logger.exception(e) logger.error(f'Emulator function {func.__name__}() failed') return False def emulator_start_watch(self): """ Returns: bool: True if startup completed False if timeout """ logger.hr('Emulator start', level=2) current_window = get_focused_window() serial = self.emulator_instance.serial logger.info(f'Current window: {current_window}') def adb_connect(): m = self.adb_client.connect(self.serial) if 'connected' in m: # Connected to 127.0.0.1:59865 # Already connected to 127.0.0.1:59865 return False elif '(10061)' in m: # cannot connect to 127.0.0.1:55555: # No connection could be made because the target machine actively refused it. (10061) return False else: return True @run_once def show_online(m): logger.info(f'Emulator online: {m}') @run_once def show_ping(m): logger.info(f'Command ping: {m}') @run_once def show_package(m): logger.info(f'Found azurlane packages: {m}') interval = Timer(0.5).start() timeout = Timer(300).start() new_window = 0 while 1: interval.wait() interval.reset() if timeout.reached(): logger.warning(f'Emulator start timeout') return False # Check emulator window showing up # logger.info([get_focused_window(), get_window_title(get_focused_window())]) if current_window != 0 and new_window == 0: new_window = get_focused_window() if current_window != new_window: logger.info(f'New window showing up: {new_window}, focus back') set_focus_window(current_window) else: new_window = 0 # Check device connection devices = self.list_device().select(serial=serial) # logger.info(devices) if devices: device: AdbDeviceWithStatus = devices.first_or_none() if device.status == 'device': # Emulator online pass if device.status == 'offline': self.adb_client.disconnect(serial) adb_connect() continue else: # Try to connect adb_connect() continue show_online(devices.first_or_none()) # Check command availability try: pong = self.adb_shell(['echo', 'pong']) except Exception as e: logger.info(e) continue show_ping(pong) # Check azuelane package packages = self.list_azurlane_packages(show_log=False) if len(packages): pass else: continue show_package(packages) # All check passed break if new_window != 0 and new_window != current_window: logger.info(f'Minimize new window: {new_window}') minimize_window(new_window) if current_window: logger.info(f'De-flash current window: {current_window}') flash_window(current_window, flash=False) if new_window: logger.info(f'Flash new window: {new_window}') flash_window(new_window, flash=True) logger.info('Emulator start completed') return True def emulator_start(self): logger.hr('Emulator start', level=1) for _ in range(3): # Stop if not self._emulator_function_wrapper(self._emulator_stop): return False # Start if self._emulator_function_wrapper(self._emulator_start): # Success self.emulator_start_watch() return True else: # Failed to start, stop and start again if self._emulator_function_wrapper(self._emulator_stop): continue else: return False logger.error('Failed to start emulator 3 times, stopped') return False def emulator_stop(self): logger.hr('Emulator stop', level=1) return self._emulator_function_wrapper(self._emulator_stop) if __name__ == '__main__': self = PlatformWindows('alas') self.emulator_start()