gcsim use queue to limit task nums

This commit is contained in:
omg-xtao 2023-12-03 15:28:42 +08:00 committed by GitHub
parent 031198b08d
commit daf5b7a7e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 109 additions and 24 deletions

View File

@ -1,8 +1,8 @@
import copy
from typing import Optional, TYPE_CHECKING, List, Union, Dict
from typing import Optional, TYPE_CHECKING, List, Union, Dict, Tuple
from enkanetwork import EnkaNetworkResponse
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, Message
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import filters
from telegram.helpers import create_deep_linked_url
@ -12,21 +12,24 @@ from core.dependence.redisdb import RedisDB
from core.plugin import Plugin, handler
from core.services.players import PlayersService
from gram_core.services.template.services import TemplateService
from gram_core.services.users.services import UserAdminService
from metadata.shortname import roleToName, roleToId
from modules.gcsim.file import PlayerGCSimScripts
from modules.playercards.file import PlayerCardsFile
from plugins.genshin.gcsim.renderer import GCSimResultRenderer
from plugins.genshin.gcsim.runner import GCSimRunner, GCSimFit
from plugins.genshin.gcsim.runner import GCSimRunner, GCSimFit, GCSimQueueFull, GCSimResult
from plugins.genshin.model.base import CharacterInfo
from plugins.genshin.model.converters.enka import EnkaConverter
from utils.log import logger
if TYPE_CHECKING:
from telegram import Update, Message
from telegram.ext import ContextTypes
__all__ = ("GCSimPlugin",)
async def _no_account_return(message: Message, context: "ContextTypes.DEFAULT_TYPE"):
async def _no_account_return(message: "Message", context: "ContextTypes.DEFAULT_TYPE"):
buttons = [
[
InlineKeyboardButton(
@ -38,7 +41,7 @@ async def _no_account_return(message: Message, context: "ContextTypes.DEFAULT_TY
await message.reply_text("未查询到您所绑定的账号信息,请先绑定账号", reply_markup=InlineKeyboardMarkup(buttons))
async def _no_character_return(user_id: int, uid: int, message: Message):
async def _no_character_return(user_id: int, uid: int, message: "Message"):
photo = open("resources/img/kitsune.png", "rb")
buttons = [
[
@ -62,6 +65,7 @@ class GCSimPlugin(Plugin):
player_service: PlayersService,
template_service: TemplateService,
redis: RedisDB = None,
user_admin_service: UserAdminService = None,
):
self.player_service = player_service
self.player_cards_file = PlayerCardsFile()
@ -69,6 +73,7 @@ class GCSimPlugin(Plugin):
self.gcsim_runner = GCSimRunner(redis)
self.gcsim_renderer = GCSimResultRenderer(assets_service, template_service)
self.scripts_per_page = 8
self.user_admin_service = user_admin_service
async def initialize(self):
await self.gcsim_runner.initialize()
@ -104,13 +109,23 @@ class GCSimPlugin(Plugin):
)
return buttons
async def _get_uid(self, user_id: int, args: List[str], reply: Optional["Message"]) -> Optional[int]:
@staticmethod
def _filter_fits_by_names(names: List[str], fits: List[GCSimFit]) -> List[GCSimFit]:
if not names:
return fits
return [fit for fit in fits if all(name in [str(i) for i in fit.characters] for name in names)]
async def _get_uid_names(
self, user_id: int, args: List[str], reply: Optional["Message"]
) -> Tuple[Optional[int], List[str]]:
"""通过消息获取 uid优先级args > reply > self"""
uid, user_id_ = None, user_id
uid, user_id_, names = None, user_id, []
if args:
for i in args:
if i is not None and i.isdigit() and len(i) == 9:
uid = int(i)
if i is not None and roleToId(i) is not None:
names.append(roleToName(i))
if reply:
try:
user_id_ = reply.from_user.id
@ -124,7 +139,7 @@ class GCSimPlugin(Plugin):
player_info = await self.player_service.get_player(user_id)
if player_info is not None:
uid = player_info.player_id
return uid
return uid, names
@staticmethod
def _fix_skill_level(data: Dict) -> Dict:
@ -153,7 +168,7 @@ class GCSimPlugin(Plugin):
return character_infos
@handler.command(command="gcsim", block=False)
async def gcsim(self, update: Update, context: "ContextTypes.DEFAULT_TYPE"):
async def gcsim(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"):
user = update.effective_user
message = update.effective_message
args = self.get_args(context)
@ -166,9 +181,9 @@ class GCSimPlugin(Plugin):
self.add_delete_message_job(reply)
self.add_delete_message_job(message)
return
logger.info("用户 %s[%s] 发出 gcsim 命令", user.full_name, user.id)
uid = await self._get_uid(user.id, args, message.reply_to_message)
uid, names = await self._get_uid_names(user.id, args, message.reply_to_message)
logger.info("用户 %s[%s] 发出 gcsim 命令 UID[%s] NAMES[%s]", user.full_name, user.id, uid, " ".join(names))
if uid is None:
return await _no_account_return(message, context)
@ -179,6 +194,7 @@ class GCSimPlugin(Plugin):
fits = await self.gcsim_runner.get_fits(uid)
if not fits:
fits = await self.gcsim_runner.calculate_fits(uid, character_infos)
fits = self._filter_fits_by_names(names, fits)
if not fits:
await message.reply_text("好像没有找到适合旅行者的配队呢,要不更新下面板吧")
return
@ -250,9 +266,6 @@ class GCSimPlugin(Plugin):
user = callback_query.from_user
message = callback_query.message
user_id, uid, script_key = callback_query.data.split("|")[1:]
msg_to_reply = message
if message.reply_to_message:
msg_to_reply = message.reply_to_message
logger.info("用户 %s[%s] GCSim运行请求 || %s", user.full_name, user.id, callback_query.data)
if str(user.id) != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
@ -263,8 +276,26 @@ class GCSimPlugin(Plugin):
if not character_infos:
return await _no_character_return(user.id, uid, message)
await callback_query.edit_message_text("GCSim 运行中...", reply_markup=InlineKeyboardMarkup([]))
result = await self.gcsim_runner.run(user_id, uid, script_key, character_infos)
await callback_query.edit_message_text(f"GCSim {script_key} 运行中...", reply_markup=InlineKeyboardMarkup([]))
results = []
callback_task = self._callback(update, results, character_infos)
priority = 1 if await self.user_admin_service.is_admin(user.id) else 2
try:
await self.gcsim_runner.run(user_id, uid, script_key, character_infos, results, callback_task, priority)
except GCSimQueueFull:
await callback_query.edit_message_text("派蒙任务过多忙碌中,请稍后再试")
return
async def _callback(
self, update: "Update", results: List[GCSimResult], character_infos: List[CharacterInfo]
) -> None:
result = results[0]
callback_query = update.callback_query
message = callback_query.message
_, uid, script_key = callback_query.data.split("|")[1:]
msg_to_reply = message
if message.reply_to_message:
msg_to_reply = message.reply_to_message
if result.error:
await callback_query.edit_message_text(result.error)
else:

View File

@ -1,6 +1,6 @@
import json
from pathlib import Path
from typing import Optional, List
from typing import Optional, List, TYPE_CHECKING
from core.dependence.assets import AssetsService
from gram_core.services.template.models import RenderResult
@ -9,12 +9,23 @@ from metadata.shortname import idToName, elementToName, elementsToColor
from plugins.genshin.model import GCSim, GCSimCharacterInfo, CharacterInfo
from plugins.genshin.model.converters.gcsim import GCSimConverter
if TYPE_CHECKING:
from utils.typedefs import StrOrInt
class GCSimResultRenderer:
def __init__(self, assets_service: AssetsService, template_service: TemplateService):
self.assets_service = assets_service
self.template_service = template_service
@staticmethod
def fix_asset_id(asset_id: "StrOrInt") -> "StrOrInt":
if "-" in str(asset_id):
_asset_id = asset_id.split("-")[0]
if _asset_id.isnumeric():
return int(_asset_id)
return asset_id
async def prepare_result(
self, result_path: Path, script: GCSim, character_infos: List[CharacterInfo]
) -> Optional[dict]:
@ -23,6 +34,7 @@ class GCSimResultRenderer:
result["extra"] = {}
for idx, character_details in enumerate(result["character_details"]):
asset_id, _ = GCSimConverter.to_character(character_details["name"])
asset_id = self.fix_asset_id(asset_id)
gcsim_character: GCSimCharacterInfo = next(
filter(lambda gc, cn=character_details["name"]: gc.character == cn, script.characters), None
)

View File

@ -4,8 +4,8 @@ import platform
import time
from dataclasses import dataclass
from pathlib import Path
from queue import Queue
from typing import Optional, Dict, List, Union, TYPE_CHECKING, Tuple
from queue import PriorityQueue
from typing import Optional, Dict, List, Union, TYPE_CHECKING, Tuple, Coroutine, Any
import gcsim_pypi
from pydantic import BaseModel
@ -72,6 +72,21 @@ def _get_limit_command() -> str:
return ""
class GCSimRunnerTask:
def __init__(self, task: Coroutine[Any, Any, None]):
self.task = task
def __lt__(self, other: "GCSimRunnerTask") -> bool:
return False
async def run(self) -> None:
await self.task
class GCSimQueueFull(Exception):
pass
class GCSimRunner:
def __init__(self, client: "RedisDB"):
self.initialized = False
@ -81,7 +96,8 @@ class GCSimRunner:
self.scripts: Dict[str, GCSim] = {}
max_concurrent_gcsim = multiprocessing.cpu_count()
self.sema = asyncio.BoundedSemaphore(max_concurrent_gcsim)
self.queue: Queue[None] = Queue()
self.queue_size = 21
self.queue: PriorityQueue[List[int, GCSimRunnerTask]] = PriorityQueue(maxsize=self.queue_size)
self.cache = GCSimCache(client)
@staticmethod
@ -130,8 +146,23 @@ class GCSimRunner:
logger.debug("加载 %d GCSim 脚本耗时 %.2f", len(self.scripts), time.time() - now)
self.initialized = True
@staticmethod
async def _execute_queue(
gcsim_task: Coroutine[Any, Any, GCSimResult],
results: List[GCSimResult],
callback_task: Coroutine[Any, Any, None],
) -> None:
data = await gcsim_task
results.append(data)
await callback_task
async def _execute_gcsim(
self, user_id: str, uid: str, script_key: str, added_time: float, character_infos: List[CharacterInfo]
self,
user_id: str,
uid: str,
script_key: str,
added_time: float,
character_infos: List[CharacterInfo],
) -> GCSimResult:
script = self.scripts.get(script_key)
if script is None:
@ -186,11 +217,22 @@ class GCSimRunner:
uid: str,
script_key: str,
character_infos: List[CharacterInfo],
) -> GCSimResult:
results: List[GCSimResult],
callback_task: Coroutine[Any, Any, None],
priority: int = 2,
) -> None:
start_time = time.time()
gcsim_task = self._execute_gcsim(user_id, uid, script_key, start_time, character_infos)
queue_task = GCSimRunnerTask(self._execute_queue(gcsim_task, results, callback_task))
if priority == 2 and self.queue.qsize() >= (self.queue_size - 1):
raise GCSimQueueFull()
if self.queue.full():
raise GCSimQueueFull()
self.queue.put([priority, queue_task])
async with self.sema:
result = await self._execute_gcsim(user_id, uid, script_key, start_time, character_infos)
return result
if not self.queue.empty():
_, task = self.queue.get()
await task.run()
async def calculate_fits(self, uid: Union[int, str], character_infos: List[CharacterInfo]) -> List[GCSimFit]:
fits = []