diff --git a/plugins/genshin/redeem.py b/plugins/genshin/redeem.py deleted file mode 100644 index cc73ba48..00000000 --- a/plugins/genshin/redeem.py +++ /dev/null @@ -1,66 +0,0 @@ -from typing import TYPE_CHECKING - -from simnet.errors import RedemptionInvalid, RedemptionClaimed, RegionNotSupported, RedemptionCooldown -from telegram import Update -from telegram.ext import CallbackContext -from telegram.ext import filters - -from core.plugin import Plugin, handler -from plugins.tools.genshin import GenshinHelper -from utils.log import logger - -if TYPE_CHECKING: - from simnet import GenshinClient - - -class Redeem(Plugin): - """兑换码兑换""" - - def __init__( - self, - genshin_helper: GenshinHelper, - ): - self.genshin_helper = genshin_helper - - async def redeem_code(self, uid: int, code: str) -> str: - try: - if not code: - raise RedemptionInvalid - async with self.genshin_helper.genshin(uid) as client: - client: "GenshinClient" - await client.redeem_code_by_hoyolab(code) - msg = "兑换码兑换成功。" - except RegionNotSupported: - msg = "此服务器暂不支持进行兑换哦~" - except RedemptionInvalid: - msg = "兑换码格式不正确,请确认。" - except RedemptionClaimed: - msg = "此兑换码已经兑换过了。" - except RedemptionCooldown as e: - msg = e.message - return msg - - @handler.command(command="redeem", cookie=True, block=False) - @handler.message(filters=filters.Regex("^兑换码兑换(.*)"), cookie=True, block=False) - async def command_start(self, update: Update, context: CallbackContext) -> None: - user_id = await self.get_real_user_id(update) - message = update.effective_message - args = self.get_args(context) - code = args[0] if args else None - self.log_user(update, logger.info, "兑换码兑换命令请求 code[%s]", code) - if filters.ChatType.GROUPS.filter(message): - self.add_delete_message_job(message) - msg = await self.redeem_code(user_id, code) - reply_message = await message.reply_text(msg) - if filters.ChatType.GROUPS.filter(reply_message): - self.add_delete_message_job(reply_message) - - @handler.command(command="start", filters=filters.Regex(r" redeem_(.*)"), block=False) - async def start_redeem(self, update: Update, context: CallbackContext) -> None: - user = update.effective_user - message = update.effective_message - args = self.get_args(context) - code = args[0].split("_")[1] - logger.info("用户 %s[%s] 通过start命令 进入兑换码兑换流程 code[%s]", user.full_name, user.id, code) - msg = await self.redeem_code(user.id, code) - await message.reply_text(msg) diff --git a/plugins/genshin/redeem/redeem.py b/plugins/genshin/redeem/redeem.py new file mode 100644 index 00000000..8564f91a --- /dev/null +++ b/plugins/genshin/redeem/redeem.py @@ -0,0 +1,100 @@ +import asyncio +import time +from typing import List + +from telegram import Update +from telegram.error import BadRequest +from telegram.ext import CallbackContext +from telegram.ext import filters + +from core.plugin import Plugin, handler +from gram_core.services.users.services import UserAdminService +from plugins.genshin.redeem.runner import RedeemRunner, RedeemResult, RedeemQueueFull +from plugins.tools.genshin import GenshinHelper +from utils.log import logger + + +REDEEM_TEXT = """#### 兑换结果 #### +时间:{} (UTC+8) +UID: {} +兑换码:{} +兑换结果:{}""" + + +class Redeem(Plugin): + """兑换码兑换""" + + def __init__( + self, + genshin_helper: GenshinHelper, + user_admin_service: UserAdminService, + ): + self.genshin_helper = genshin_helper + self.user_admin_service = user_admin_service + self.max_code_in_pri_message = 5 + self.max_code_in_pub_message = 3 + self.redeem_runner = RedeemRunner(genshin_helper) + + async def _callback(self, data: "RedeemResult") -> None: + code = data.code + uid = data.uid if data.uid else "未知" + msg = data.error if data.error else "成功" + today = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + + text = REDEEM_TEXT.format(today, uid, code, msg) + reply_message = None + try: + reply_message = await data.message.edit_text(text) + except BadRequest: + try: + reply_message = await data.message.reply_text(text) + except BadRequest: + pass + if reply_message and filters.ChatType.GROUPS.filter(reply_message): + self.add_delete_message_job(reply_message) + + async def redeem_one_code(self, update: Update, user_id: int, uid: int, code: str): + if not code: + return + message = update.effective_message + reply_message = await message.reply_text("正在兑换中,请稍等") + + task_data = RedeemResult(user_id=user_id, code=code, uid=uid, message=reply_message) + priority = 1 if await self.user_admin_service.is_admin(user_id) else 2 + try: + await self.redeem_runner.run(task_data, self._callback, priority) + except RedeemQueueFull: + await reply_message.edit_text("兑换队列已满,请稍后再试") + if filters.ChatType.GROUPS.filter(reply_message): + self.add_delete_message_job(reply_message) + + async def redeem_codes(self, update: Update, user_id: int, codes: List[str]): + async with self.genshin_helper.genshin(user_id) as client: + uid = client.player_id + tasks = [] + for code in codes: + tasks.append(self.redeem_one_code(update, user_id, uid, code)) + await asyncio.gather(*tasks) + + @handler.command(command="redeem", cookie=True, block=False) + @handler.message(filters=filters.Regex("^兑换码兑换(.*)"), cookie=True, block=False) + async def command_start(self, update: Update, context: CallbackContext) -> None: + user_id = await self.get_real_user_id(update) + message = update.effective_message + limit = self.max_code_in_pri_message + if filters.ChatType.GROUPS.filter(message): + self.add_delete_message_job(message) + limit = self.max_code_in_pub_message + codes = [i for i in self.get_args(context) if i][:limit] + self.log_user(update, logger.info, "兑换码兑换命令请求 codes[%s]", codes) + if not codes: + return + await self.redeem_codes(update, user_id, codes) + + @handler.command(command="start", filters=filters.Regex(r" redeem_(.*)"), block=False) + async def start_redeem(self, update: Update, context: CallbackContext) -> None: + user = update.effective_user + args = self.get_args(context) + codes = [i for i in args[0].split("_")[1:] if i][: self.max_code_in_pri_message] + logger.info("用户 %s[%s] 通过start命令 进入兑换码兑换流程 codes[%s]", user.full_name, user.id, codes) + await self.redeem_codes(update, user.id, codes) diff --git a/plugins/genshin/redeem/runner.py b/plugins/genshin/redeem/runner.py new file mode 100644 index 00000000..aa1f012b --- /dev/null +++ b/plugins/genshin/redeem/runner.py @@ -0,0 +1,92 @@ +import asyncio +from dataclasses import dataclass +from queue import PriorityQueue +from typing import Coroutine, Any, Optional, List, TYPE_CHECKING, Union + +from simnet.errors import RegionNotSupported, RedemptionInvalid, RedemptionClaimed, RedemptionCooldown +from telegram import Message + +from plugins.tools.genshin import GenshinHelper + +if TYPE_CHECKING: + from simnet import GenshinClient + + +@dataclass +class RedeemResult: + user_id: int + code: str + message: Message + error: Optional[str] = None + uid: Optional[int] = 0 + + +class RedeemRunnerTask: + def __init__(self, task: Coroutine[Any, Any, None]): + self.task = task + + def __lt__(self, other: "RedeemRunnerTask") -> bool: + return False + + async def run(self) -> None: + await self.task + + +class RedeemQueueFull(Exception): + pass + + +class RedeemRunner: + def __init__(self, genshin_helper: GenshinHelper): + self.gcsim_version: Optional[str] = None + self.sema = asyncio.BoundedSemaphore(1) + self.queue_size = 21 + self.queue: PriorityQueue[List[Union[int, RedeemRunnerTask]]] = PriorityQueue(maxsize=self.queue_size) + self.genshin_helper = genshin_helper + + @staticmethod + async def _execute_queue( + redeem_task: Coroutine[Any, Any, RedeemResult], + callback_task: "(result: RedeemResult) -> Coroutine[Any, Any, None]", + ) -> None: + data = await redeem_task + await callback_task(data) + + async def run( + self, + data: RedeemResult, + callback_task: "(result: RedeemResult) -> Coroutine[Any, Any, None]", + priority: int = 2, + ) -> None: + redeem_task = self.redeem_code(data) + queue_task = RedeemRunnerTask(self._execute_queue(redeem_task, callback_task)) + if priority == 2 and self.queue.qsize() >= (self.queue_size - 1): + raise RedeemQueueFull() + if self.queue.full(): + raise RedeemQueueFull() + self.queue.put([priority, queue_task]) + async with self.sema: + if not self.queue.empty(): + _, task = self.queue.get() + await task.run() + await asyncio.sleep(5) + + async def redeem_code(self, result: RedeemResult) -> RedeemResult: + error = None + try: + async with self.genshin_helper.genshin(result.user_id) as client: + client: "GenshinClient" + result.uid = client.player_id + await client.redeem_code_by_hoyolab(result.code) + except RegionNotSupported: + error = "此服务器暂不支持进行兑换哦~" + except RedemptionInvalid: + error = "兑换码格式不正确,请确认。" + except RedemptionClaimed: + error = "此兑换码已经兑换过了。" + except RedemptionCooldown as e: + error = e.message + except Exception as e: + error = str(e)[:500] + result.error = error + return result