diff --git a/metadata/pool/pool_301.py b/metadata/pool/pool_301.py index 3b547aa7..551721f6 100644 --- a/metadata/pool/pool_301.py +++ b/metadata/pool/pool_301.py @@ -273,7 +273,7 @@ POOL_301 = [ "to": "2022-10-14 17:59:59", }, { - "five": ["甘雨", "心海"], + "five": ["甘雨", "珊瑚宫心海"], "four": ["行秋", "砂糖", "多莉"], "from": "2022-09-09 18:00:00", "name": "浮生孰来|浮岳虹珠", diff --git a/metadata/pool/pool_500.py b/metadata/pool/pool_500.py index 2ee7bc19..472f27f7 100644 --- a/metadata/pool/pool_500.py +++ b/metadata/pool/pool_500.py @@ -2,6 +2,43 @@ POOL_500 = [ { "five": ["晨风之诗"], "four": [], + "avatar": [ + "琴", + "莫娜", + "迪卢克", + "可莉", + "阿贝多", + "优菈", + "米卡", + "罗莎莉亚", + "砂糖", + "迪奥娜", + "诺艾尔", + "班尼特", + "菲谢尔", + "安柏", + "雷泽", + "凯亚", + "芭芭拉", + "丽莎", + ], + "weapon": [ + "天空之翼", + "天空之卷", + "天空之脊", + "天空之傲", + "天空之刃", + "四风原典", + "狼的末路", + "风鹰剑", + "松籁响起之时", + "猎人之径", + "苇海信标", + "暗巷闪光", + "暗巷的酒与诗", + "幽夜华尔兹", + "暗巷猎手", + ], "name": "晨风之诗", "from": "2024-03-13 06:00:00", "to": "2024-04-02 17:59:59", diff --git a/plugins/admin/set_command.py b/plugins/admin/set_command.py index e664d80d..1e173573 100644 --- a/plugins/admin/set_command.py +++ b/plugins/admin/set_command.py @@ -28,6 +28,7 @@ class SetCommandPlugin(Plugin): BotCommand("wish_log_delete", "删除抽卡记录"), BotCommand("wish_log_online_view", "抽卡记录在线浏览"), BotCommand("wish_log_rank", "抽卡排行榜"), + BotCommand("wish_waiting_list", "未复刻列表"), BotCommand("pay_log", "查看充值记录"), BotCommand("pay_log_import", "导入充值记录"), BotCommand("pay_log_export", "导出充值记录"), diff --git a/plugins/genshin/wish_waiting_list.py b/plugins/genshin/wish_waiting_list.py new file mode 100644 index 00000000..0e9489ee --- /dev/null +++ b/plugins/genshin/wish_waiting_list.py @@ -0,0 +1,245 @@ +import asyncio +import math +from datetime import datetime +from functools import partial +from typing import Dict, Tuple, List, TYPE_CHECKING, Optional + +from pydantic import BaseModel + +from telegram import InlineKeyboardButton, InlineKeyboardMarkup +from telegram.constants import ChatAction +from telegram.ext import filters + +from core.dependence.assets import AssetsService, AssetsCouldNotFound +from gram_core.config import config +from gram_core.plugin import Plugin, handler +from gram_core.plugin.methods.inline_use_data import IInlineUseData +from gram_core.services.template.services import TemplateService + +from metadata.pool.pool import POOL_301 as CHARACTER_POOL, POOL_302 as WEAPON_POOL, POOL_500 as MIX_POOL +from plugins.tools.player_info import PlayerInfoSystem +from utils.log import logger + +if TYPE_CHECKING: + from telegram import Update + from telegram.ext import ContextTypes + + from gram_core.services.template.models import RenderResult + + +class WishWaitingListData(BaseModel): + name: str + """名称""" + icon: Optional[str] = None + """图标""" + up_times: int + """总共 UP 次数""" + last_up_time: datetime + """上一次 UP 时间""" + last_up_day: int + """距离上一次 UP 多少天""" + + +class WishWaitingListPlugin(Plugin): + """未复刻列表""" + + def __init__( + self, + assets: AssetsService, + template_service: TemplateService, + player_info: PlayerInfoSystem, + ): + self.assets_service = assets + self.template_service = template_service + self.player_info = player_info + self.waiting_list = {} + + async def initialize(self) -> None: + asyncio.create_task(self.init_data()) + + async def init_data(self): + now = datetime.now() + if self.waiting_list and (now - self.waiting_list["time"]).total_seconds() < 3600: + return + data = { + "avatar": await self._get_waiting_list(CHARACTER_POOL, "avatar", self.assets_service.avatar), + "weapon": await self._get_waiting_list(WEAPON_POOL, "weapon", self.assets_service.weapon), + "time": now, + } + self.waiting_list.update(data) + + @staticmethod + async def _ignore_static_pool(pool_type: str): + if pool_type == "avatar": + return ["莫娜", "七七", "迪卢克", "琴", "提纳里", "刻晴", "迪希雅"] + return [] + + @staticmethod + async def _ignore_mix_pool( + pool_type: str, + five_times: Dict[str, WishWaitingListData], + four_times: Dict[str, WishWaitingListData], + ): + now = datetime.now() + for p in MIX_POOL: + does = p[pool_type] + last_up_time = datetime.strptime(p["to"], "%Y-%m-%d %H:%M:%S") + last_up_day = math.ceil((now - last_up_time).total_seconds() / 86400) + for do in does: + t: WishWaitingListData = five_times.get(do) or four_times.get(do) + if not t: + continue + t.up_times += 1 + if t.last_up_time < last_up_time: + t.last_up_time = last_up_time + t.last_up_day = last_up_day + + async def _get_waiting_list( + self, + pool, + pool_type: str, + assets, + ) -> Tuple[Dict[str, WishWaitingListData], List[str], Dict[str, WishWaitingListData], List[str]]: + now = datetime.now() + five_times: Dict[str, WishWaitingListData] = {} + five_data = [] + four_times: Dict[str, WishWaitingListData] = {} + four_data = [] + ignore = await self._ignore_static_pool(pool_type) + for p in pool: + fives = p["five"] + fours = p["four"] + last_up_time = datetime.strptime(p["to"], "%Y-%m-%d %H:%M:%S") + last_up_day = max(math.ceil((now - last_up_time).total_seconds() / 86400), 0) + for i, times in [(fives, five_times), (fours, four_times)]: + for n in i: + if n in ignore: + continue + if n in times: + times[n].up_times += 1 + else: + try: + icon = (await assets(n).icon()).as_uri() + except AssetsCouldNotFound: + icon = "" + times[n] = WishWaitingListData( + name=n, + icon=icon, + up_times=1, + last_up_time=last_up_time, + last_up_day=last_up_day, + ) + await self._ignore_mix_pool(pool_type, five_times, four_times) + for times, data in [(five_times, five_data), (four_times, four_data)]: + data.clear() + data.extend(list(times.keys())) + data.sort(key=lambda j: times[j].last_up_day, reverse=True) # pylint: disable=W0640 + return five_times, five_data, four_times, four_data + + @handler.command("wish_waiting_list", block=False) + @handler.message(filters=filters.Regex(r"^未复刻列表?(角色|武器|)$"), block=False) + async def command_start(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None: + user_id = await self.get_real_user_id(update) + message = update.effective_message + is_avatar = True + if args := self.get_args(context): + if "角色" in args: + is_avatar = True + elif "武器" in args: + is_avatar = False + self.log_user(update, logger.info, "查询未复刻列表 is_avatar[%s]", is_avatar) + await message.reply_chat_action(ChatAction.TYPING) + image = await self.render(user_id, is_avatar) + await message.reply_chat_action(ChatAction.UPLOAD_PHOTO) + await image.reply_photo(message, reply_markup=await self.get_wish_waiting_list_button(user_id, is_avatar)) + + async def render(self, user_id: int, is_avatar: bool) -> "RenderResult": + await self.init_data() + _data = self.waiting_list["avatar" if is_avatar else "weapon"] + data = { + "fiveLog": _data[1], + "fiveData": _data[0], + "fourLog": _data[3], + "fourData": _data[2], + } + name_card = await self.player_info.get_name_card(None, user_id) + data["name_card"] = name_card + return await self.template_service.render( + "genshin/wish_log/wish_waiting_list.jinja2", + data, + full_page=True, + query_selector=".body_box", + ) + + @staticmethod + async def get_wish_waiting_list_button(user_id: int, is_avatar: bool): + return InlineKeyboardMarkup( + [ + [ + ( + InlineKeyboardButton( + ">> 切换到武器池 <<", callback_data=f"get_wish_waiting_list|{user_id}|weapon" + ) + if is_avatar + else InlineKeyboardButton( + ">> 切换到角色池 <<", callback_data=f"get_wish_waiting_list|{user_id}|avatar" + ) + ) + ] + ] + ) + + @handler.callback_query(pattern=r"^get_wish_waiting_list\|", block=False) + async def get_wish_waiting_list(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None: + callback_query = update.callback_query + user = callback_query.from_user + message = callback_query.message + + async def get_wish_waiting_list_callback( + callback_query_data: str, + ) -> Tuple[str, int]: + _data = callback_query_data.split("|") + _user_id = int(_data[1]) + _result = _data[2] + logger.debug( + "callback_query_data函数返回 result[%s] user_id[%s]", + _result, + _user_id, + ) + return _result, _user_id + + try: + pool_type, user_id = await get_wish_waiting_list_callback(callback_query.data) + except IndexError: + await callback_query.answer("按钮数据已过期,请重新获取。", show_alert=True) + self.add_delete_message_job(message, delay=1) + return + if user.id != user_id: + await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True) + return + is_avatar = pool_type == "avatar" + await self._get_wish_waiting_list(update, context, is_avatar) + + async def _get_wish_waiting_list(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE", is_avatar: bool) -> None: + callback_query = update.callback_query + user = callback_query.from_user + user_id = user.id + + image = await self.render(user_id, is_avatar) + await image.edit_inline_media( + callback_query, + reply_markup=await self.get_wish_waiting_list_button(user_id, is_avatar), + ) + + async def get_inline_use_data(self) -> List[Optional[IInlineUseData]]: + types = {"角色": "avatar", "武器": "weapon"} + data = [] + for k, v in types.items(): + data.append( + IInlineUseData( + text=f"未复刻列表 - {k}池", + hash=f"wish_waiting_list_{v}", + callback=partial(self._get_wish_waiting_list, is_avatar=v == "avatar"), + ) + ) + return data diff --git a/plugins/tools/player_info.py b/plugins/tools/player_info.py index 88639859..c2550876 100644 --- a/plugins/tools/player_info.py +++ b/plugins/tools/player_info.py @@ -51,15 +51,16 @@ class PlayerInfoSystem(Plugin): avatar = (await self.assets_service.avatar(0).icon()).as_uri() return name_card, avatar, nickname, rarity - async def get_name_card(self, player_id: int, user_id: int): - player = await self.player_service.get(user_id, player_id) - player_info = await self.player_info_service.get(player) + async def get_name_card(self, player_id: Optional[int], user_id: Optional[int]): name_card: Optional[str] = None - try: - if player_info is not None and player_info.name_card is not None: - name_card = (await self.assets_service.namecard(int(player_info.name_card)).navbar()).as_uri() - except Exception as exc: # pylint: disable=W0703 - logger.error("卡片信息请求失败 %s", str(exc)) + player = await self.player_service.get(user_id, player_id) + if player: + player_info = await self.player_info_service.get(player) + try: + if player_info is not None and player_info.name_card is not None: + name_card = (await self.assets_service.namecard(int(player_info.name_card)).navbar()).as_uri() + except Exception as exc: # pylint: disable=W0703 + logger.error("卡片信息请求失败 %s", str(exc)) if name_card is None: # 默认 name_card = (await self.assets_service.namecard(0).navbar()).as_uri() return name_card diff --git a/resources/genshin/wish_log/wish_waiting_list.jinja2 b/resources/genshin/wish_log/wish_waiting_list.jinja2 new file mode 100644 index 00000000..be1dcf01 --- /dev/null +++ b/resources/genshin/wish_log/wish_waiting_list.jinja2 @@ -0,0 +1,75 @@ + + +
+ + + + + + + +