import argparse import os import queue import threading from multiprocessing import Process from typing import Dict, List, Union import inflection from filelock import FileLock from rich.console import Console, ConsoleRenderable from module.config.utils import filepath_config from module.logger import logger, set_file_logger, set_func_logger from module.webui.fake import get_config_mod, mod_instance from module.webui.setting import State from module.webui.submodule.utils import get_available_func class ProcessManager: _processes: Dict[str, "ProcessManager"] = {} def __init__(self, config_name: str = "alas") -> None: self.config_name = config_name self._renderable_queue: queue.Queue[ConsoleRenderable] = State.manager.Queue() self.renderables: List[ConsoleRenderable] = [] self.renderables_max_length = 400 self.renderables_reduce_length = 80 self._process: Process = None self.thd_log_queue_handler: threading.Thread = None def start(self, func, ev: threading.Event = None) -> None: if not self.alive: if func is None: func = get_config_mod(self.config_name) self._process = Process( target=ProcessManager.run_process, args=( self.config_name, func, self._renderable_queue, ev, ), ) self._process.start() self.start_log_queue_handler() def start_log_queue_handler(self): if ( self.thd_log_queue_handler is not None and self.thd_log_queue_handler.is_alive() ): return self.thd_log_queue_handler = threading.Thread( target=self._thread_log_queue_handler ) self.thd_log_queue_handler.start() def stop(self) -> None: lock = FileLock(f"{filepath_config(self.config_name)}.lock") with lock: if self.alive: self._process.kill() self.renderables.append( f"[{self.config_name}] exited. Reason: Manual stop\n" ) if self.thd_log_queue_handler is not None: self.thd_log_queue_handler.join(timeout=1) if self.thd_log_queue_handler.is_alive(): logger.warning( "Log queue handler thread does not stop within 1 seconds" ) logger.info(f"[{self.config_name}] exited") def _thread_log_queue_handler(self) -> None: while self.alive: try: log = self._renderable_queue.get(timeout=1) except queue.Empty: continue self.renderables.append(log) if len(self.renderables) > self.renderables_max_length: self.renderables = self.renderables[self.renderables_reduce_length :] logger.info("End of log queue handler loop") @property def alive(self) -> bool: if self._process is not None: return self._process.is_alive() else: return False @property def state(self) -> int: if self.alive: return 1 elif len(self.renderables) == 0: return 2 else: console = Console(no_color=True) with console.capture() as capture: console.print(self.renderables[-1]) s = capture.get().strip() if s.endswith("Reason: Manual stop"): return 2 elif s.endswith("Reason: Finish"): return 2 elif s.endswith("Reason: Update"): return 4 else: return 3 @classmethod def get_manager(cls, config_name: str) -> "ProcessManager": """ Create a new alas if not exists. """ if config_name not in cls._processes: cls._processes[config_name] = ProcessManager(config_name) return cls._processes[config_name] @staticmethod def run_process( config_name, func: str, q: queue.Queue, e: threading.Event = None ) -> None: parser = argparse.ArgumentParser() parser.add_argument( "--electron", action="store_true", help="Runs by electron client." ) args, _ = parser.parse_known_args() State.electron = args.electron # Setup logger set_file_logger(name=config_name) if State.electron: # https://github.com/LmeSzinc/AzurLaneAutoScript/issues/2051 logger.info("Electron detected, remove log output to stdout") from module.logger.logger import console_hdlr logger.removeHandler(console_hdlr) set_func_logger(func=q.put) from module.config.config import AzurLaneConfig AzurLaneConfig.stop_event = e try: # Run alas if func == "alas": from module.alas import AzurLaneAutoScript from src import StarRailCopilot if e is not None: AzurLaneAutoScript.stop_event = e StarRailCopilot(config_name=config_name).loop() elif func in get_available_func(): from src import StarRailCopilot StarRailCopilot(config_name=config_name).run(inflection.underscore(func)) else: logger.critical(f"No function matched: {func}") logger.info(f"[{config_name}] exited. Reason: Finish\n") except Exception as e: logger.exception(e) @classmethod def running_instances(cls) -> List["ProcessManager"]: l = [] for process in cls._processes.values(): if process.alive: l.append(process) return l @staticmethod def restart_processes( instances: List[Union["ProcessManager", str]] = None, ev: threading.Event = None ): """ After update and reload, or failed to perform an update, restart all alas that running before update """ logger.hr("Restart alas") # Load MOD_CONFIG_DICT mod_instance() if instances is None: instances = [] _instances = set() for instance in instances: if isinstance(instance, str): _instances.add(ProcessManager.get_manager(instance)) elif isinstance(instance, ProcessManager): _instances.add(instance) try: with open("./config/reloadalas", mode="r") as f: for line in f.readlines(): line = line.strip() _instances.add(ProcessManager.get_manager(line)) except FileNotFoundError: pass for process in _instances: logger.info(f"Starting [{process.config_name}]") process.start(func=get_config_mod(process.config_name), ev=ev) try: os.remove("./config/reloadalas") except: pass logger.info("Start alas complete")