From daf5b7a7e950ddf37c43007928661fd7e4b0fec1 Mon Sep 17 00:00:00 2001 From: omg-xtao <100690902+omg-xtao@users.noreply.github.com> Date: Sun, 3 Dec 2023 15:28:42 +0800 Subject: [PATCH] :zap: gcsim use queue to limit task nums --- plugins/genshin/gcsim/plugin.py | 63 +++++++++++++++++++++++-------- plugins/genshin/gcsim/renderer.py | 14 ++++++- plugins/genshin/gcsim/runner.py | 56 +++++++++++++++++++++++---- 3 files changed, 109 insertions(+), 24 deletions(-) diff --git a/plugins/genshin/gcsim/plugin.py b/plugins/genshin/gcsim/plugin.py index 6d0d85e1..9bb3e73f 100644 --- a/plugins/genshin/gcsim/plugin.py +++ b/plugins/genshin/gcsim/plugin.py @@ -1,8 +1,8 @@ import copy -from typing import Optional, TYPE_CHECKING, List, Union, Dict +from typing import Optional, TYPE_CHECKING, List, Union, Dict, Tuple from enkanetwork import EnkaNetworkResponse -from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, Message +from telegram import InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import filters from telegram.helpers import create_deep_linked_url @@ -12,21 +12,24 @@ from core.dependence.redisdb import RedisDB from core.plugin import Plugin, handler from core.services.players import PlayersService from gram_core.services.template.services import TemplateService +from gram_core.services.users.services import UserAdminService +from metadata.shortname import roleToName, roleToId from modules.gcsim.file import PlayerGCSimScripts from modules.playercards.file import PlayerCardsFile from plugins.genshin.gcsim.renderer import GCSimResultRenderer -from plugins.genshin.gcsim.runner import GCSimRunner, GCSimFit +from plugins.genshin.gcsim.runner import GCSimRunner, GCSimFit, GCSimQueueFull, GCSimResult from plugins.genshin.model.base import CharacterInfo from plugins.genshin.model.converters.enka import EnkaConverter from utils.log import logger if TYPE_CHECKING: + from telegram import Update, Message from telegram.ext import ContextTypes __all__ = ("GCSimPlugin",) -async def _no_account_return(message: Message, context: "ContextTypes.DEFAULT_TYPE"): +async def _no_account_return(message: "Message", context: "ContextTypes.DEFAULT_TYPE"): buttons = [ [ InlineKeyboardButton( @@ -38,7 +41,7 @@ async def _no_account_return(message: Message, context: "ContextTypes.DEFAULT_TY await message.reply_text("未查询到您所绑定的账号信息,请先绑定账号", reply_markup=InlineKeyboardMarkup(buttons)) -async def _no_character_return(user_id: int, uid: int, message: Message): +async def _no_character_return(user_id: int, uid: int, message: "Message"): photo = open("resources/img/kitsune.png", "rb") buttons = [ [ @@ -62,6 +65,7 @@ class GCSimPlugin(Plugin): player_service: PlayersService, template_service: TemplateService, redis: RedisDB = None, + user_admin_service: UserAdminService = None, ): self.player_service = player_service self.player_cards_file = PlayerCardsFile() @@ -69,6 +73,7 @@ class GCSimPlugin(Plugin): self.gcsim_runner = GCSimRunner(redis) self.gcsim_renderer = GCSimResultRenderer(assets_service, template_service) self.scripts_per_page = 8 + self.user_admin_service = user_admin_service async def initialize(self): await self.gcsim_runner.initialize() @@ -104,13 +109,23 @@ class GCSimPlugin(Plugin): ) return buttons - async def _get_uid(self, user_id: int, args: List[str], reply: Optional["Message"]) -> Optional[int]: + @staticmethod + def _filter_fits_by_names(names: List[str], fits: List[GCSimFit]) -> List[GCSimFit]: + if not names: + return fits + return [fit for fit in fits if all(name in [str(i) for i in fit.characters] for name in names)] + + async def _get_uid_names( + self, user_id: int, args: List[str], reply: Optional["Message"] + ) -> Tuple[Optional[int], List[str]]: """通过消息获取 uid,优先级:args > reply > self""" - uid, user_id_ = None, user_id + uid, user_id_, names = None, user_id, [] if args: for i in args: if i is not None and i.isdigit() and len(i) == 9: uid = int(i) + if i is not None and roleToId(i) is not None: + names.append(roleToName(i)) if reply: try: user_id_ = reply.from_user.id @@ -124,7 +139,7 @@ class GCSimPlugin(Plugin): player_info = await self.player_service.get_player(user_id) if player_info is not None: uid = player_info.player_id - return uid + return uid, names @staticmethod def _fix_skill_level(data: Dict) -> Dict: @@ -153,7 +168,7 @@ class GCSimPlugin(Plugin): return character_infos @handler.command(command="gcsim", block=False) - async def gcsim(self, update: Update, context: "ContextTypes.DEFAULT_TYPE"): + async def gcsim(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"): user = update.effective_user message = update.effective_message args = self.get_args(context) @@ -166,9 +181,9 @@ class GCSimPlugin(Plugin): self.add_delete_message_job(reply) self.add_delete_message_job(message) return - logger.info("用户 %s[%s] 发出 gcsim 命令", user.full_name, user.id) - uid = await self._get_uid(user.id, args, message.reply_to_message) + uid, names = await self._get_uid_names(user.id, args, message.reply_to_message) + logger.info("用户 %s[%s] 发出 gcsim 命令 UID[%s] NAMES[%s]", user.full_name, user.id, uid, " ".join(names)) if uid is None: return await _no_account_return(message, context) @@ -179,6 +194,7 @@ class GCSimPlugin(Plugin): fits = await self.gcsim_runner.get_fits(uid) if not fits: fits = await self.gcsim_runner.calculate_fits(uid, character_infos) + fits = self._filter_fits_by_names(names, fits) if not fits: await message.reply_text("好像没有找到适合旅行者的配队呢,要不更新下面板吧") return @@ -250,9 +266,6 @@ class GCSimPlugin(Plugin): user = callback_query.from_user message = callback_query.message user_id, uid, script_key = callback_query.data.split("|")[1:] - msg_to_reply = message - if message.reply_to_message: - msg_to_reply = message.reply_to_message logger.info("用户 %s[%s] GCSim运行请求 || %s", user.full_name, user.id, callback_query.data) if str(user.id) != user_id: await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True) @@ -263,8 +276,26 @@ class GCSimPlugin(Plugin): if not character_infos: return await _no_character_return(user.id, uid, message) - await callback_query.edit_message_text("GCSim 运行中...", reply_markup=InlineKeyboardMarkup([])) - result = await self.gcsim_runner.run(user_id, uid, script_key, character_infos) + await callback_query.edit_message_text(f"GCSim {script_key} 运行中...", reply_markup=InlineKeyboardMarkup([])) + results = [] + callback_task = self._callback(update, results, character_infos) + priority = 1 if await self.user_admin_service.is_admin(user.id) else 2 + try: + await self.gcsim_runner.run(user_id, uid, script_key, character_infos, results, callback_task, priority) + except GCSimQueueFull: + await callback_query.edit_message_text("派蒙任务过多忙碌中,请稍后再试") + return + + async def _callback( + self, update: "Update", results: List[GCSimResult], character_infos: List[CharacterInfo] + ) -> None: + result = results[0] + callback_query = update.callback_query + message = callback_query.message + _, uid, script_key = callback_query.data.split("|")[1:] + msg_to_reply = message + if message.reply_to_message: + msg_to_reply = message.reply_to_message if result.error: await callback_query.edit_message_text(result.error) else: diff --git a/plugins/genshin/gcsim/renderer.py b/plugins/genshin/gcsim/renderer.py index 6afbb5a8..faebe119 100644 --- a/plugins/genshin/gcsim/renderer.py +++ b/plugins/genshin/gcsim/renderer.py @@ -1,6 +1,6 @@ import json from pathlib import Path -from typing import Optional, List +from typing import Optional, List, TYPE_CHECKING from core.dependence.assets import AssetsService from gram_core.services.template.models import RenderResult @@ -9,12 +9,23 @@ from metadata.shortname import idToName, elementToName, elementsToColor from plugins.genshin.model import GCSim, GCSimCharacterInfo, CharacterInfo from plugins.genshin.model.converters.gcsim import GCSimConverter +if TYPE_CHECKING: + from utils.typedefs import StrOrInt + class GCSimResultRenderer: def __init__(self, assets_service: AssetsService, template_service: TemplateService): self.assets_service = assets_service self.template_service = template_service + @staticmethod + def fix_asset_id(asset_id: "StrOrInt") -> "StrOrInt": + if "-" in str(asset_id): + _asset_id = asset_id.split("-")[0] + if _asset_id.isnumeric(): + return int(_asset_id) + return asset_id + async def prepare_result( self, result_path: Path, script: GCSim, character_infos: List[CharacterInfo] ) -> Optional[dict]: @@ -23,6 +34,7 @@ class GCSimResultRenderer: result["extra"] = {} for idx, character_details in enumerate(result["character_details"]): asset_id, _ = GCSimConverter.to_character(character_details["name"]) + asset_id = self.fix_asset_id(asset_id) gcsim_character: GCSimCharacterInfo = next( filter(lambda gc, cn=character_details["name"]: gc.character == cn, script.characters), None ) diff --git a/plugins/genshin/gcsim/runner.py b/plugins/genshin/gcsim/runner.py index 90cf4045..82855e26 100644 --- a/plugins/genshin/gcsim/runner.py +++ b/plugins/genshin/gcsim/runner.py @@ -4,8 +4,8 @@ import platform import time from dataclasses import dataclass from pathlib import Path -from queue import Queue -from typing import Optional, Dict, List, Union, TYPE_CHECKING, Tuple +from queue import PriorityQueue +from typing import Optional, Dict, List, Union, TYPE_CHECKING, Tuple, Coroutine, Any import gcsim_pypi from pydantic import BaseModel @@ -72,6 +72,21 @@ def _get_limit_command() -> str: return "" +class GCSimRunnerTask: + def __init__(self, task: Coroutine[Any, Any, None]): + self.task = task + + def __lt__(self, other: "GCSimRunnerTask") -> bool: + return False + + async def run(self) -> None: + await self.task + + +class GCSimQueueFull(Exception): + pass + + class GCSimRunner: def __init__(self, client: "RedisDB"): self.initialized = False @@ -81,7 +96,8 @@ class GCSimRunner: self.scripts: Dict[str, GCSim] = {} max_concurrent_gcsim = multiprocessing.cpu_count() self.sema = asyncio.BoundedSemaphore(max_concurrent_gcsim) - self.queue: Queue[None] = Queue() + self.queue_size = 21 + self.queue: PriorityQueue[List[int, GCSimRunnerTask]] = PriorityQueue(maxsize=self.queue_size) self.cache = GCSimCache(client) @staticmethod @@ -130,8 +146,23 @@ class GCSimRunner: logger.debug("加载 %d GCSim 脚本耗时 %.2f 秒", len(self.scripts), time.time() - now) self.initialized = True + @staticmethod + async def _execute_queue( + gcsim_task: Coroutine[Any, Any, GCSimResult], + results: List[GCSimResult], + callback_task: Coroutine[Any, Any, None], + ) -> None: + data = await gcsim_task + results.append(data) + await callback_task + async def _execute_gcsim( - self, user_id: str, uid: str, script_key: str, added_time: float, character_infos: List[CharacterInfo] + self, + user_id: str, + uid: str, + script_key: str, + added_time: float, + character_infos: List[CharacterInfo], ) -> GCSimResult: script = self.scripts.get(script_key) if script is None: @@ -186,11 +217,22 @@ class GCSimRunner: uid: str, script_key: str, character_infos: List[CharacterInfo], - ) -> GCSimResult: + results: List[GCSimResult], + callback_task: Coroutine[Any, Any, None], + priority: int = 2, + ) -> None: start_time = time.time() + gcsim_task = self._execute_gcsim(user_id, uid, script_key, start_time, character_infos) + queue_task = GCSimRunnerTask(self._execute_queue(gcsim_task, results, callback_task)) + if priority == 2 and self.queue.qsize() >= (self.queue_size - 1): + raise GCSimQueueFull() + if self.queue.full(): + raise GCSimQueueFull() + self.queue.put([priority, queue_task]) async with self.sema: - result = await self._execute_gcsim(user_id, uid, script_key, start_time, character_infos) - return result + if not self.queue.empty(): + _, task = self.queue.get() + await task.run() async def calculate_fits(self, uid: Union[int, str], character_infos: List[CharacterInfo]) -> List[GCSimFit]: fits = []