mirror of
https://github.com/PaiGramTeam/PaiGram.git
synced 2024-11-25 09:37:30 +00:00
274 lines
9.9 KiB
Python
274 lines
9.9 KiB
Python
import asyncio
|
|
import multiprocessing
|
|
import platform
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from queue import PriorityQueue
|
|
from typing import Optional, Dict, List, Union, TYPE_CHECKING, Tuple, Coroutine, Any
|
|
|
|
import gcsim_pypi
|
|
from pydantic import BaseModel
|
|
|
|
from gram_core.config import config
|
|
from metadata.shortname import idToName
|
|
from modules.apihelper.client.components.remote import Remote
|
|
from modules.gcsim.cache import GCSimCache
|
|
from modules.gcsim.file import PlayerGCSimScripts
|
|
from plugins.genshin.model.base import CharacterInfo, Character
|
|
from plugins.genshin.model.converters.gcsim import GCSimConverter
|
|
from plugins.genshin.model.gcsim import GCSim, GCSimCharacter
|
|
from utils.const import DATA_DIR
|
|
from utils.log import logger
|
|
|
|
if TYPE_CHECKING:
|
|
from core.dependence.redisdb import RedisDB
|
|
|
|
GCSIM_SCRIPTS_PATH = DATA_DIR / "gcsim" / "scripts"
|
|
GCSIM_SCRIPTS_PATH.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
class FitCharacter(BaseModel):
|
|
id: int
|
|
name: str
|
|
gcsim: GCSimCharacter
|
|
character: Character
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
|
|
class GCSimFit(BaseModel):
|
|
script_key: str
|
|
fit_count: int
|
|
characters: List[FitCharacter]
|
|
total_levels: int
|
|
total_weapon_levels: int
|
|
|
|
|
|
@dataclass
|
|
class GCSimResult:
|
|
error: Optional[str]
|
|
user_id: str
|
|
uid: str
|
|
script_key: str
|
|
script: Optional[GCSim] = None
|
|
file_id: Optional[str] = None
|
|
|
|
|
|
def _get_gcsim_bin_name() -> str:
|
|
if platform.system() == "Windows":
|
|
return "gcsim.exe"
|
|
bin_name = "gcsim"
|
|
if platform.system() == "Darwin":
|
|
bin_name += ".darwin"
|
|
if platform.machine() == "arm64":
|
|
bin_name += ".arm64"
|
|
return bin_name
|
|
|
|
|
|
def _get_limit_command() -> str:
|
|
if platform.system() == "Linux":
|
|
return "ulimit -m 1000000 && ulimit -v 1000000 && timeout 120 "
|
|
return ""
|
|
|
|
|
|
class GCSimRunnerTask:
|
|
def __init__(self, task: Coroutine[Any, Any, None]):
|
|
self.task = task
|
|
|
|
def __lt__(self, other: "GCSimRunnerTask") -> bool:
|
|
return False
|
|
|
|
async def run(self) -> None:
|
|
await self.task
|
|
|
|
|
|
class GCSimQueueFull(Exception):
|
|
pass
|
|
|
|
|
|
class GCSimRunner:
|
|
def __init__(self, client: "RedisDB"):
|
|
self.initialized = False
|
|
self.bin_path = None
|
|
self.player_gcsim_scripts = PlayerGCSimScripts()
|
|
self.gcsim_version: Optional[str] = None
|
|
self.scripts: Dict[str, GCSim] = {}
|
|
max_concurrent_gcsim = multiprocessing.cpu_count()
|
|
self.sema = asyncio.BoundedSemaphore(max_concurrent_gcsim)
|
|
self.queue_size = 21
|
|
self.queue: PriorityQueue[List[int, GCSimRunnerTask]] = PriorityQueue(maxsize=self.queue_size)
|
|
self.cache = GCSimCache(client)
|
|
|
|
@staticmethod
|
|
def check_gcsim_script(name: str, script: str) -> Optional[GCSim]:
|
|
try:
|
|
return GCSimConverter.from_gcsim_script(script)
|
|
except ValueError as e:
|
|
logger.error("无法解析 GCSim 脚本 %s: %s", name, e)
|
|
return None
|
|
|
|
async def refresh(self):
|
|
self.player_gcsim_scripts.clear_fits()
|
|
self.scripts.clear()
|
|
new_scripts = await Remote.get_gcsim_scripts()
|
|
for name, text in new_scripts.items():
|
|
if script := self.check_gcsim_script(name, text):
|
|
self.scripts[name] = script
|
|
for path in GCSIM_SCRIPTS_PATH.iterdir():
|
|
if path.is_file():
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
try:
|
|
if script := self.check_gcsim_script(path.name, f.read()):
|
|
self.scripts[path.stem] = script
|
|
except UnicodeError as e:
|
|
logger.error("无法读取 GCSim 脚本 %s: %s", path.name, e)
|
|
|
|
async def initialize(self):
|
|
gcsim_pypi_path = Path(gcsim_pypi.__file__).parent
|
|
|
|
self.bin_path = gcsim_pypi_path.joinpath("bin").joinpath(_get_gcsim_bin_name())
|
|
|
|
process = await asyncio.create_subprocess_exec(
|
|
self.bin_path, "-version", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
stdout, stderr = await process.communicate()
|
|
|
|
if process.returncode == 0:
|
|
self.gcsim_version = stdout.decode().splitlines()[0]
|
|
logger.debug("GCSim version: %s", self.gcsim_version)
|
|
else:
|
|
logger.error("GCSim 运行时出错: %s", stderr.decode())
|
|
|
|
now = time.time()
|
|
await self.refresh()
|
|
logger.debug("加载 %d GCSim 脚本耗时 %.2f 秒", len(self.scripts), time.time() - now)
|
|
self.initialized = True
|
|
|
|
@staticmethod
|
|
async def _execute_queue(
|
|
gcsim_task: Coroutine[Any, Any, GCSimResult],
|
|
results: List[GCSimResult],
|
|
callback_task: Coroutine[Any, Any, None],
|
|
) -> None:
|
|
data = await gcsim_task
|
|
results.append(data)
|
|
await callback_task
|
|
|
|
async def _execute_gcsim(
|
|
self,
|
|
user_id: str,
|
|
uid: str,
|
|
script_key: str,
|
|
added_time: float,
|
|
character_infos: List[CharacterInfo],
|
|
) -> GCSimResult:
|
|
script = self.scripts.get(script_key)
|
|
if script is None:
|
|
return GCSimResult(error="未找到脚本", user_id=user_id, uid=uid, script_key=script_key)
|
|
try:
|
|
merged_script = GCSimConverter.merge_character_infos(script.copy(), character_infos)
|
|
except ValueError:
|
|
return GCSimResult(error="无法合并角色信息", user_id=user_id, uid=uid, script_key=script_key)
|
|
if not config.debug:
|
|
if file_id := await self.cache.get_cache(uid, hash(str(merged_script))):
|
|
return GCSimResult(error=None, user_id=user_id, uid=uid, script_key=script_key, file_id=file_id)
|
|
await self.player_gcsim_scripts.write_script(uid, script_key, str(merged_script))
|
|
limit = _get_limit_command()
|
|
command = [
|
|
self.bin_path,
|
|
"-c",
|
|
self.player_gcsim_scripts.get_script_path(uid, script_key).absolute().as_posix(),
|
|
"-out",
|
|
self.player_gcsim_scripts.get_result_path(uid, script_key).absolute().as_posix(),
|
|
]
|
|
process = await asyncio.create_subprocess_shell(
|
|
limit + " ".join([str(i) for i in command]),
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await process.communicate()
|
|
logger.debug("GCSim 脚本 (%s|%s|%s) 用时 %.2fs", user_id, uid, script_key, time.time() - added_time)
|
|
error = None
|
|
if stderr:
|
|
error = stderr.decode()[:500]
|
|
if "out of memory" in error:
|
|
error = "超出内存限制"
|
|
if process.returncode == 124:
|
|
error = "超出运行时间限制"
|
|
if error:
|
|
logger.error("GCSim 脚本 (%s|%s|%s) 错误: %s", user_id, uid, script_key, error)
|
|
return GCSimResult(error=error, user_id=user_id, uid=uid, script_key=script_key, script=merged_script)
|
|
if stdout:
|
|
logger.info("GCSim 脚本 (%s|%s|%s) 运行完成", user_id, uid, script_key)
|
|
logger.debug("GCSim 脚本 (%s|%s|%s) 输出: %s", user_id, uid, script_key, stdout.decode())
|
|
return GCSimResult(error=None, user_id=user_id, uid=uid, script_key=script_key, script=merged_script)
|
|
return GCSimResult(
|
|
error="No output",
|
|
user_id=user_id,
|
|
uid=uid,
|
|
script_key=script_key,
|
|
script=merged_script,
|
|
)
|
|
|
|
async def run(
|
|
self,
|
|
user_id: str,
|
|
uid: str,
|
|
script_key: str,
|
|
character_infos: List[CharacterInfo],
|
|
results: List[GCSimResult],
|
|
callback_task: Coroutine[Any, Any, None],
|
|
priority: int = 2,
|
|
) -> None:
|
|
start_time = time.time()
|
|
gcsim_task = self._execute_gcsim(user_id, uid, script_key, start_time, character_infos)
|
|
queue_task = GCSimRunnerTask(self._execute_queue(gcsim_task, results, callback_task))
|
|
if priority == 2 and self.queue.qsize() >= (self.queue_size - 1):
|
|
raise GCSimQueueFull()
|
|
if self.queue.full():
|
|
raise GCSimQueueFull()
|
|
self.queue.put([priority, queue_task])
|
|
async with self.sema:
|
|
if not self.queue.empty():
|
|
_, task = self.queue.get()
|
|
await task.run()
|
|
|
|
async def calculate_fits(self, uid: Union[int, str], character_infos: List[CharacterInfo]) -> List[GCSimFit]:
|
|
fits = []
|
|
for key, script in self.scripts.items():
|
|
# 空和莹会被认为是两个角色
|
|
fit_characters: List[Tuple[CharacterInfo, GCSimCharacter]] = []
|
|
for ch in character_infos:
|
|
gcsim_character = GCSimConverter.from_character(ch.character)
|
|
if gcsim_character in [c.character for c in script.characters]:
|
|
fit_characters.append((ch, gcsim_character))
|
|
if fit_characters:
|
|
fits.append(
|
|
GCSimFit(
|
|
script_key=key,
|
|
characters=[
|
|
FitCharacter(id=ch[0].id, name=idToName(ch[0].id), gcsim=ch[1], character=ch[0].character)
|
|
for ch in fit_characters
|
|
],
|
|
fit_count=len(fit_characters),
|
|
total_levels=sum(ch.level for ch in script.characters),
|
|
total_weapon_levels=sum(ch.weapon_info.level for ch in script.characters),
|
|
)
|
|
)
|
|
fits = sorted(
|
|
fits,
|
|
key=lambda x: (x.fit_count, x.total_levels, x.total_weapon_levels),
|
|
reverse=True,
|
|
)
|
|
await self.player_gcsim_scripts.write_fits(uid, [fit.dict() for fit in fits])
|
|
return fits
|
|
|
|
async def get_fits(self, uid: Union[int, str]) -> List[GCSimFit]:
|
|
return [GCSimFit(**fit) for fit in self.player_gcsim_scripts.get_fits(uid)]
|
|
|
|
async def remove_fits(self, uid: Union[int, str]) -> None:
|
|
self.player_gcsim_scripts.remove_fits(uid)
|