From e1f663b1deaf57ea4e1607553e369ff0e419f6dd Mon Sep 17 00:00:00 2001 From: xtaodada Date: Tue, 19 Nov 2024 22:21:48 +0800 Subject: [PATCH] :sparkles: Support wish waiting list --- plugins/admin/set_command.py | 1 + plugins/tools/player_info.py | 7 +- plugins/zzz/signal_waiting_list.py | 247 ++++++++++++++++++ .../zzz/gacha_log/wish_waiting_list.jinja2 | 71 +++++ 4 files changed, 325 insertions(+), 1 deletion(-) create mode 100644 plugins/zzz/signal_waiting_list.py create mode 100644 resources/zzz/gacha_log/wish_waiting_list.jinja2 diff --git a/plugins/admin/set_command.py b/plugins/admin/set_command.py index dc49db5..813cf32 100644 --- a/plugins/admin/set_command.py +++ b/plugins/admin/set_command.py @@ -43,6 +43,7 @@ class SetCommandPlugin(Plugin): BotCommand("signal_log", "查看调频记录"), BotCommand("signal_log_online_view", "调频记录在线浏览"), BotCommand("signal_log_rank", "抽卡排行榜"), + BotCommand("signal_waiting_list", "未复刻列表"), BotCommand("action_log", "查询登录记录"), BotCommand("dailynote", "查询实时便笺"), BotCommand("redeem", "(国际服)兑换 Key"), diff --git a/plugins/tools/player_info.py b/plugins/tools/player_info.py index 92a6446..91d3738 100644 --- a/plugins/tools/player_info.py +++ b/plugins/tools/player_info.py @@ -117,5 +117,10 @@ class PlayerInfoSystem(Plugin): await self.set_form_cache(base_info) return base_info - async def get_theme_info(self, player_id: int) -> PlayerAvatarInfo: + async def get_theme_info(self, player_id: Optional[int], user_id: Optional[int] = None) -> PlayerAvatarInfo: + if not player_id and user_id is not None: + player = await self.player_service.get_player(user_id) + if player is None: + return self.get_base_avatar_info(0, "") + player_id = player.player_id return await self.get_player_info(player_id, "") diff --git a/plugins/zzz/signal_waiting_list.py b/plugins/zzz/signal_waiting_list.py new file mode 100644 index 0000000..062d8d0 --- /dev/null +++ b/plugins/zzz/signal_waiting_list.py @@ -0,0 +1,247 @@ +import asyncio +import math +from datetime import datetime +from functools import partial +from typing import Dict, Tuple, List, TYPE_CHECKING, Optional + +from pydantic import BaseModel + +from telegram import InlineKeyboardButton, InlineKeyboardMarkup +from telegram.constants import ChatAction +from telegram.ext import filters + +from core.dependence.assets import AssetsService, AssetsCouldNotFound +from gram_core.config import config +from gram_core.plugin import Plugin, handler +from gram_core.plugin.methods.inline_use_data import IInlineUseData +from gram_core.services.template.services import TemplateService + +from metadata.pool.pool import POOL_2 as CHARACTER_POOL, POOL_3 as WEAPON_POOL +from plugins.tools.player_info import PlayerInfoSystem +from utils.log import logger + +if TYPE_CHECKING: + from telegram import Update + from telegram.ext import ContextTypes + + from gram_core.services.template.models import RenderResult + +MIX_POOL = [] + + +class WishWaitingListData(BaseModel): + name: str + """名称""" + icon: Optional[str] = None + """图标""" + up_times: int + """总共 UP 次数""" + last_up_time: datetime + """上一次 UP 时间""" + last_up_day: int + """距离上一次 UP 多少天""" + + +class WishWaitingListPlugin(Plugin): + """未复刻列表""" + + def __init__( + self, + assets: AssetsService, + template_service: TemplateService, + player_info: PlayerInfoSystem, + ): + self.assets_service = assets + self.template_service = template_service + self.player_info = player_info + self.waiting_list = {} + + async def initialize(self) -> None: + asyncio.create_task(self.init_data()) + + async def init_data(self): + now = datetime.now() + if self.waiting_list and (now - self.waiting_list["time"]).total_seconds() < 3600: + return + data = { + "avatar": await self._get_waiting_list(CHARACTER_POOL, "avatar", self.assets_service.avatar.normal), + "weapon": await self._get_waiting_list(WEAPON_POOL, "weapon", self.assets_service.weapon.icon), + "time": now, + } + self.waiting_list.update(data) + + @staticmethod + async def _ignore_static_pool(pool_type: str): + if pool_type == "avatar": + return ["莱卡恩", "猫又", "格莉丝", "丽娜", "「11号」", "珂蕾妲"] + return [] + + @staticmethod + async def _ignore_mix_pool( + pool_type: str, + five_times: Dict[str, WishWaitingListData], + four_times: Dict[str, WishWaitingListData], + ): + now = datetime.now() + for p in MIX_POOL: + does = p[pool_type] + last_up_time = datetime.strptime(p["to"], "%Y-%m-%d %H:%M:%S") + last_up_day = math.ceil((now - last_up_time).total_seconds() / 86400) + for do in does: + t: WishWaitingListData = five_times.get(do) or four_times.get(do) + if not t: + continue + t.up_times += 1 + if t.last_up_time < last_up_time: + t.last_up_time = last_up_time + t.last_up_day = last_up_day + + async def _get_waiting_list( + self, + pool, + pool_type: str, + assets, + ) -> Tuple[Dict[str, WishWaitingListData], List[str], Dict[str, WishWaitingListData], List[str]]: + now = datetime.now() + five_times: Dict[str, WishWaitingListData] = {} + five_data = [] + four_times: Dict[str, WishWaitingListData] = {} + four_data = [] + ignore = await self._ignore_static_pool(pool_type) + for p in pool: + fives = p["five"] + fours = p["four"] + last_up_time = datetime.strptime(p["to"], "%Y-%m-%d %H:%M:%S") + last_up_day = max(math.ceil((now - last_up_time).total_seconds() / 86400), 0) + for i, times in [(fives, five_times), (fours, four_times)]: + for n in i: + if n in ignore: + continue + if n in times: + times[n].up_times += 1 + else: + try: + icon = assets(n).as_uri() + except (AssetsCouldNotFound, AttributeError): + icon = "" + times[n] = WishWaitingListData( + name=n, + icon=icon, + up_times=1, + last_up_time=last_up_time, + last_up_day=last_up_day, + ) + await self._ignore_mix_pool(pool_type, five_times, four_times) + for times, data in [(five_times, five_data), (four_times, four_data)]: + data.clear() + data.extend(list(times.keys())) + data.sort(key=lambda j: times[j].last_up_day, reverse=True) # pylint: disable=W0640 + return five_times, five_data, four_times, four_data + + @handler.command("signal_waiting_list", block=False) + @handler.message(filters=filters.Regex(r"^未复刻列表?(角色|武器|)$"), block=False) + async def command_start(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None: + user_id = await self.get_real_user_id(update) + message = update.effective_message + is_avatar = True + if args := self.get_args(context): + if "角色" in args: + is_avatar = True + elif "武器" in args: + is_avatar = False + self.log_user(update, logger.info, "查询未复刻列表 is_avatar[%s]", is_avatar) + await message.reply_chat_action(ChatAction.TYPING) + image = await self.render(user_id, is_avatar) + await message.reply_chat_action(ChatAction.UPLOAD_PHOTO) + await image.reply_photo(message, reply_markup=await self.get_wish_waiting_list_button(user_id, is_avatar)) + + async def render(self, user_id: int, is_avatar: bool) -> "RenderResult": + await self.init_data() + _data = self.waiting_list["avatar" if is_avatar else "weapon"] + data = { + "fiveLog": _data[1], + "fiveData": _data[0], + "fourLog": _data[3], + "fourData": _data[2], + } + name_card = (await self.player_info.get_theme_info(None, user_id)).name_card + data["background"] = name_card + return await self.template_service.render( + "zzz/gacha_log/wish_waiting_list.jinja2", + data, + full_page=True, + query_selector=".body_box", + ) + + @staticmethod + async def get_wish_waiting_list_button(user_id: int, is_avatar: bool): + return InlineKeyboardMarkup( + [ + [ + ( + InlineKeyboardButton( + ">> 切换到武器池 <<", callback_data=f"get_wish_waiting_list|{user_id}|weapon" + ) + if is_avatar + else InlineKeyboardButton( + ">> 切换到角色池 <<", callback_data=f"get_wish_waiting_list|{user_id}|avatar" + ) + ) + ] + ] + ) + + @handler.callback_query(pattern=r"^get_wish_waiting_list\|", block=False) + async def get_wish_waiting_list(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None: + callback_query = update.callback_query + user = callback_query.from_user + message = callback_query.message + + async def get_wish_waiting_list_callback( + callback_query_data: str, + ) -> Tuple[str, int]: + _data = callback_query_data.split("|") + _user_id = int(_data[1]) + _result = _data[2] + logger.debug( + "callback_query_data函数返回 result[%s] user_id[%s]", + _result, + _user_id, + ) + return _result, _user_id + + try: + pool_type, user_id = await get_wish_waiting_list_callback(callback_query.data) + except IndexError: + await callback_query.answer("按钮数据已过期,请重新获取。", show_alert=True) + self.add_delete_message_job(message, delay=1) + return + if user.id != user_id: + await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True) + return + is_avatar = pool_type == "avatar" + await self._get_wish_waiting_list(update, context, is_avatar) + + async def _get_wish_waiting_list(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE", is_avatar: bool) -> None: + callback_query = update.callback_query + user = callback_query.from_user + user_id = user.id + + image = await self.render(user_id, is_avatar) + await image.edit_inline_media( + callback_query, + reply_markup=await self.get_wish_waiting_list_button(user_id, is_avatar), + ) + + async def get_inline_use_data(self) -> List[Optional[IInlineUseData]]: + types = {"角色": "avatar", "武器": "weapon"} + data = [] + for k, v in types.items(): + data.append( + IInlineUseData( + text=f"未复刻列表 - {k}池", + hash=f"wish_waiting_list_{v}", + callback=partial(self._get_wish_waiting_list, is_avatar=v == "avatar"), + ) + ) + return data diff --git a/resources/zzz/gacha_log/wish_waiting_list.jinja2 b/resources/zzz/gacha_log/wish_waiting_list.jinja2 new file mode 100644 index 0000000..617c24f --- /dev/null +++ b/resources/zzz/gacha_log/wish_waiting_list.jinja2 @@ -0,0 +1,71 @@ + + + + + + + + Title + + +
+
+
+
+
+

未复刻列表

+

+ 角色池 +

+
+
+
+ +
+
+ + 五星列表 + +
+
+ {% for val in fiveLog %} + {% set d = fiveData[val] %} +
+ {% if d.last_up_day == 0 %} + UP + {% endif %} + +
{{ d.last_up_day }}
+
+ {% endfor %} +
+
+ + 四星列表 + +
+
+ {% for val in fourLog %} + {% set d = fourData[val] %} +
+ {% if d.last_up_day == 0 %} + UP + {% endif %} + +
{{ d.last_up_day }}
+
+ {% endfor %} +
+
+ +
+
+ +