StarRailCopilot/module/webui/process_manager.py

204 lines
6.7 KiB
Python
Raw Normal View History

2023-05-14 07:48:34 +00:00
import argparse
import os
import queue
import threading
from multiprocessing import Process
from typing import Dict, List, Union
from filelock import FileLock
from module.config.utils import filepath_config
from module.logger import logger, set_file_logger, set_func_logger
from module.webui.fake import get_config_mod, mod_instance
from module.webui.setting import State
from rich.console import Console, ConsoleRenderable
class ProcessManager:
_processes: Dict[str, "ProcessManager"] = {}
def __init__(self, config_name: str = "alas") -> None:
self.config_name = config_name
self._renderable_queue: queue.Queue[ConsoleRenderable] = State.manager.Queue()
self.renderables: List[ConsoleRenderable] = []
self.renderables_max_length = 400
self.renderables_reduce_length = 80
self._process: Process = None
self.thd_log_queue_handler: threading.Thread = None
def start(self, func, ev: threading.Event = None) -> None:
if not self.alive:
if func is None:
func = get_config_mod(self.config_name)
self._process = Process(
target=ProcessManager.run_process,
args=(
self.config_name,
func,
self._renderable_queue,
ev,
),
)
self._process.start()
self.start_log_queue_handler()
def start_log_queue_handler(self):
if (
self.thd_log_queue_handler is not None
and self.thd_log_queue_handler.is_alive()
):
return
self.thd_log_queue_handler = threading.Thread(
target=self._thread_log_queue_handler
)
self.thd_log_queue_handler.start()
def stop(self) -> None:
lock = FileLock(f"{filepath_config(self.config_name)}.lock")
with lock:
if self.alive:
self._process.kill()
self.renderables.append(
f"[{self.config_name}] exited. Reason: Manual stop\n"
)
if self.thd_log_queue_handler is not None:
self.thd_log_queue_handler.join(timeout=1)
if self.thd_log_queue_handler.is_alive():
logger.warning(
"Log queue handler thread does not stop within 1 seconds"
)
logger.info(f"[{self.config_name}] exited")
def _thread_log_queue_handler(self) -> None:
while self.alive:
try:
log = self._renderable_queue.get(timeout=1)
except queue.Empty:
continue
self.renderables.append(log)
if len(self.renderables) > self.renderables_max_length:
self.renderables = self.renderables[self.renderables_reduce_length :]
logger.info("End of log queue handler loop")
@property
def alive(self) -> bool:
if self._process is not None:
return self._process.is_alive()
else:
return False
@property
def state(self) -> int:
if self.alive:
return 1
elif len(self.renderables) == 0:
return 2
else:
console = Console(no_color=True)
with console.capture() as capture:
console.print(self.renderables[-1])
s = capture.get().strip()
if s.endswith("Reason: Manual stop"):
return 2
elif s.endswith("Reason: Finish"):
return 2
elif s.endswith("Reason: Update"):
return 4
else:
return 3
@classmethod
def get_manager(cls, config_name: str) -> "ProcessManager":
"""
Create a new alas if not exists.
"""
if config_name not in cls._processes:
cls._processes[config_name] = ProcessManager(config_name)
return cls._processes[config_name]
@staticmethod
def run_process(
config_name, func: str, q: queue.Queue, e: threading.Event = None
) -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
"--electron", action="store_true", help="Runs by electron client."
)
args, _ = parser.parse_known_args()
State.electron = args.electron
# Setup logger
set_file_logger(name=config_name)
if State.electron:
# https://github.com/LmeSzinc/AzurLaneAutoScript/issues/2051
logger.info("Electron detected, remove log output to stdout")
from module.logger import console_hdlr
logger.removeHandler(console_hdlr)
set_func_logger(func=q.put)
from module.config.config import AzurLaneConfig
AzurLaneConfig.stop_event = e
try:
# Run alas
if func == "alas":
from alas import AzurLaneAutoScript
if e is not None:
AzurLaneAutoScript.stop_event = e
AzurLaneAutoScript(config_name=config_name).loop()
else:
logger.critical(f"No function matched: {func}")
logger.info(f"[{config_name}] exited. Reason: Finish\n")
except Exception as e:
logger.exception(e)
@classmethod
def running_instances(cls) -> List["ProcessManager"]:
l = []
for process in cls._processes.values():
if process.alive:
l.append(process)
return l
@staticmethod
def restart_processes(
instances: List[Union["ProcessManager", str]] = None, ev: threading.Event = None
):
"""
After update and reload, or failed to perform an update,
restart all alas that running before update
"""
logger.hr("Restart alas")
# Load MOD_CONFIG_DICT
mod_instance()
if instances is None:
instances = []
_instances = set()
for instance in instances:
if isinstance(instance, str):
_instances.add(ProcessManager.get_manager(instance))
elif isinstance(instance, ProcessManager):
_instances.add(instance)
try:
with open("./config/reloadalas", mode="r") as f:
for line in f.readlines():
line = line.strip()
_instances.add(ProcessManager.get_manager(line))
except FileNotFoundError:
pass
for process in _instances:
logger.info(f"Starting [{process.config_name}]")
process.start(func=get_config_mod(process.config_name), ev=ev)
try:
os.remove("./config/reloadalas")
except:
pass
logger.info("Start alas complete")