diff --git a/alembic/versions/87c6195e5306_history_data.py b/alembic/versions/87c6195e5306_history_data.py new file mode 100644 index 0000000..70b7c13 --- /dev/null +++ b/alembic/versions/87c6195e5306_history_data.py @@ -0,0 +1,52 @@ +"""history_data + +Revision ID: 87c6195e5306 +Revises: 369fb74daad9 +Create Date: 2024-04-26 22:57:42.309397 + +""" + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = "87c6195e5306" +down_revision = "369fb74daad9" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "history_data", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("user_id", sa.BigInteger(), nullable=False), + sa.Column("data_id", sa.BigInteger(), nullable=True), + sa.Column( + "time_created", + sa.DateTime(), + server_default=sa.text("now()"), + nullable=True, + ), + sa.Column("time_updated", sa.DateTime(), nullable=True), + sa.Column("type", sa.Integer(), nullable=False), + sa.Column("data", sa.JSON(), nullable=True), + sa.PrimaryKeyConstraint("id", "user_id", "type"), + mysql_charset="utf8mb4", + mysql_collate="utf8mb4_general_ci", + ) + op.create_index( + op.f("ix_history_data_user_id"), + "history_data", + ["user_id"], + unique=False, + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f("ix_history_data_user_id"), table_name="history_data") + op.drop_table("history_data") + # ### end Alembic commands ### diff --git a/core/services/history_data/__init__.py b/core/services/history_data/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/core/services/history_data/models.py b/core/services/history_data/models.py new file mode 100644 index 0000000..5090673 --- /dev/null +++ b/core/services/history_data/models.py @@ -0,0 +1,36 @@ +import enum + +from pydantic import BaseModel +from simnet.models.starrail.chronicle.challenge import StarRailChallenge +from simnet.models.starrail.chronicle.challenge_story import StarRailChallengeStory, StarRailChallengeStoryGroup + +from gram_core.services.history_data.models import HistoryData + +__all__ = ( + "HistoryData", + "HistoryDataTypeEnum", + "HistoryDataAbyss", + "HistoryDataChallengeStory", +) + + +class HistoryDataTypeEnum(int, enum.Enum): + ABYSS = 0 # 混沌回忆 + CHALLENGE_STORY = 1 # 虚构叙事 + + +class HistoryDataAbyss(BaseModel): + abyss_data: StarRailChallenge + + @classmethod + def from_data(cls, data: HistoryData) -> "HistoryDataAbyss": + return cls.parse_obj(data.data) + + +class HistoryDataChallengeStory(BaseModel): + story_data: StarRailChallengeStory + group: StarRailChallengeStoryGroup + + @classmethod + def from_data(cls, data: HistoryData) -> "HistoryDataChallengeStory": + return cls.parse_obj(data.data) diff --git a/core/services/history_data/repositories.py b/core/services/history_data/repositories.py new file mode 100644 index 0000000..d48e8b2 --- /dev/null +++ b/core/services/history_data/repositories.py @@ -0,0 +1,3 @@ +from gram_core.services.history_data.repositories import HistoryDataRepository + +__all__ = ("HistoryDataRepository",) diff --git a/core/services/history_data/services.py b/core/services/history_data/services.py new file mode 100644 index 0000000..a88b80f --- /dev/null +++ b/core/services/history_data/services.py @@ -0,0 +1,77 @@ +import datetime +from typing import List + +from pytz import timezone +from simnet.models.starrail.chronicle.challenge import StarRailChallenge +from simnet.models.starrail.chronicle.challenge_story import StarRailChallengeStory, StarRailChallengeStoryGroup + +from core.services.history_data.models import ( + HistoryData, + HistoryDataTypeEnum, + HistoryDataAbyss, + HistoryDataChallengeStory, +) +from gram_core.base_service import BaseService +from gram_core.services.history_data.services import HistoryDataBaseServices + +try: + import ujson as jsonlib +except ImportError: + import json as jsonlib + + +__all__ = ( + "HistoryDataBaseServices", + "HistoryDataAbyssServices", + "HistoryDataChallengeStoryServices", +) + +TZ = timezone("Asia/Shanghai") + + +def json_encoder(value): + if isinstance(value, datetime.datetime): + return value.astimezone(TZ).strftime("%Y-%m-%d %H:%M:%S") + return value + + +class HistoryDataAbyssServices(BaseService, HistoryDataBaseServices): + DATA_TYPE = HistoryDataTypeEnum.ABYSS.value + + @staticmethod + def exists_data(data: HistoryData, old_data: List[HistoryData]) -> bool: + return any(d.data == data.data for d in old_data) + + @staticmethod + def create(user_id: int, abyss_data: StarRailChallenge): + data = HistoryDataAbyss(abyss_data=abyss_data) + json_data = data.json(by_alias=True, encoder=json_encoder) + return HistoryData( + user_id=user_id, + data_id=abyss_data.season, + time_created=datetime.datetime.now(), + type=HistoryDataAbyssServices.DATA_TYPE, + data=jsonlib.loads(json_data), + ) + + +class HistoryDataChallengeStoryServices(BaseService, HistoryDataBaseServices): + DATA_TYPE = HistoryDataTypeEnum.CHALLENGE_STORY.value + + @staticmethod + def exists_data(data: HistoryData, old_data: List[HistoryData]) -> bool: + return any(d.data == data.data for d in old_data) + + @staticmethod + def create(user_id: int, story_data: StarRailChallengeStory, group: StarRailChallengeStoryGroup): + data = HistoryDataChallengeStory(story_data=story_data, group=group) + json_data = data.json(by_alias=True, encoder=json_encoder) + dict_data = jsonlib.loads(json_data) + dict_data["story_data"]["groups"] = [] + return HistoryData( + user_id=user_id, + data_id=group.season, + time_created=datetime.datetime.now(), + type=HistoryDataChallengeStoryServices.DATA_TYPE, + data=dict_data, + ) diff --git a/gram_core b/gram_core index 14bc3c5..4817705 160000 --- a/gram_core +++ b/gram_core @@ -1 +1 @@ -Subproject commit 14bc3c5a191e13873cae57a5fe36d35641d51491 +Subproject commit 481770502884afbce7d6b38f0448fec9e749885e diff --git a/plugins/admin/set_command.py b/plugins/admin/set_command.py index ff15226..9ddf5d7 100644 --- a/plugins/admin/set_command.py +++ b/plugins/admin/set_command.py @@ -44,18 +44,11 @@ class SetCommandPlugin(Plugin): BotCommand("strategy", "角色攻略查询"), BotCommand("material", "角色培养素材查询"), BotCommand("challenge", "混沌回忆信息查询"), + BotCommand("challenge_history", "混沌回忆历史信息查询"), BotCommand("challenge_story", "虚构叙事信息查询"), + BotCommand("challenge_story_history", "虚构叙事历史信息查询"), BotCommand("rogue", "模拟宇宙信息查询"), BotCommand("rogue_locust", "寰宇蝗灾信息查询"), - BotCommand("museum", "冬城博物珍奇簿信息查询"), - BotCommand("fantastic_story", "评书奇谭信息查询"), - BotCommand("treasure_dungeon", "地城探宝信息查询"), - BotCommand("copper_man", "金人巷信息查询"), - BotCommand("yitai_battle", "以太战线信息查询"), - BotCommand("endless_side", "无尽位面信息查询"), - BotCommand("fox_story", "狐斋志异信息查询"), - BotCommand("boxing_show", "斗技表演赛信息查询"), - BotCommand("space_zoo", "异宠拾遗信息查询"), BotCommand("cookies_import", "从其他 BOT 导入账号信息"), BotCommand("cookies_export", "导出账号信息给其他 BOT"), ] diff --git a/plugins/starrail/challenge.py b/plugins/starrail/challenge.py index 3b64c44..ce284ef 100644 --- a/plugins/starrail/challenge.py +++ b/plugins/starrail/challenge.py @@ -1,22 +1,28 @@ """混沌回忆数据查询""" import asyncio +import math import re from functools import lru_cache from typing import Any, List, Optional, Tuple, Union, TYPE_CHECKING from arkowrapper import ArkoWrapper from pytz import timezone -from telegram import Message, Update +from telegram import Message, Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.constants import ChatAction, ParseMode -from telegram.ext import CallbackContext, filters +from telegram.ext import CallbackContext, filters, ContextTypes from core.dependence.assets import AssetsService from core.plugin import Plugin, handler from core.services.cookies.error import TooManyRequestPublicCookies +from core.services.history_data.models import HistoryDataAbyss +from core.services.history_data.services import HistoryDataAbyssServices from core.services.template.models import RenderGroupResult, RenderResult from core.services.template.services import TemplateService +from gram_core.config import config +from gram_core.dependence.redisdb import RedisDB from plugins.tools.genshin import GenshinHelper +from utils.enkanetwork import RedisCache from utils.log import logger from utils.uid import mask_number @@ -68,10 +74,14 @@ class ChallengePlugin(Plugin): template: TemplateService, helper: GenshinHelper, assets_service: AssetsService, + history_data_abyss: HistoryDataAbyssServices, + redis: RedisDB, ): self.template_service = template self.helper = helper self.assets_service = assets_service + self.history_data_abyss = history_data_abyss + self.cache = RedisCache(redis.client, key="plugin:challenge:history") async def get_uid(self, user_id: int, args: List[str], reply: Optional[Message]) -> int: """通过消息获取 uid,优先级:args > reply > self""" @@ -149,7 +159,8 @@ class ChallengePlugin(Plugin): if total: reply_text = await message.reply_text("彦卿需要时间整理混沌回忆数据,还请耐心等待哦~") await message.reply_chat_action(ChatAction.TYPING) - images = await self.get_rendered_pic(client, uid, floor, total, previous) + abyss_data = await self.get_rendered_pic_data(client, uid, previous) + images = await self.get_rendered_pic(abyss_data, uid, floor, total) except TooManyRequestPublicCookies: reply_message = await message.reply_text("查询次数太多,请您稍后重试") if filters.ChatType.GROUPS.filter(message): @@ -199,8 +210,14 @@ class ChallengePlugin(Plugin): } return render_data - async def get_rendered_pic( - self, client: "StarRailClient", uid: int, floor: int, total: bool, previous: bool + async def get_rendered_pic_data(self, client: "StarRailClient", uid: int, previous: bool) -> "StarRailChallenge": + abyss_data = await client.get_starrail_challenge(uid, previous=previous, lang="zh-cn") + if abyss_data.has_data: + await self.save_abyss_data(uid, abyss_data) + return abyss_data + + async def get_rendered_pic( # skipcq: PY-R1000 # + self, abyss_data: "StarRailChallenge", uid: int, floor: int, total: bool ) -> Union[ Tuple[ Union[BaseException, Any], @@ -216,17 +233,15 @@ class ChallengePlugin(Plugin): 获取渲染后的图片 Args: - client (Client): 获取 genshin 数据的 client + abyss_data (StarRailChallenge): 混沌回忆数据 uid (int): 需要查询的 uid floor (int): 层数 total (bool): 是否为总览 - previous (bool): 是否为上期 Returns: bytes格式的图片 """ - abyss_data = await client.get_starrail_challenge(uid, previous=previous, lang="zh-cn") if not abyss_data.has_data: raise AbyssUnlocked() start_time = abyss_data.begin_time.datetime.astimezone(TZ).strftime("%m月%d日 %H:%M") @@ -309,3 +324,252 @@ class ChallengePlugin(Plugin): "starrail/abyss/floor.html", render_data, viewport={"width": 690, "height": 500} ) ] + + async def save_abyss_data(self, uid: int, abyss_data: "StarRailChallenge"): + model = self.history_data_abyss.create(uid, abyss_data) + old_data = await self.history_data_abyss.get_by_user_id_data_id(uid, model.data_id) + exists = self.history_data_abyss.exists_data(model, old_data) + if not exists: + await self.history_data_abyss.add(model) + + async def get_abyss_data(self, uid: int): + return await self.history_data_abyss.get_by_user_id(uid) + + @staticmethod + def get_season_data_name(data: "HistoryDataAbyss"): + start_time = data.abyss_data.begin_time.datetime.astimezone(TZ) + time = start_time.strftime("%Y.%m.%d") + honor = "" + if data.abyss_data.total_stars == 36: + if data.abyss_data.total_battles == 12: + honor = "👑" + last_battles = data.abyss_data.floors[-1] + num_of_characters = max( + len(last_battles.node_1.avatars), + len(last_battles.node_2.avatars), + ) + if num_of_characters == 2: + honor = "双通" + elif num_of_characters == 1: + honor = "单通" + + return f"{time} {data.abyss_data.total_stars} ★ {honor}" + + async def get_session_button_data(self, user_id: int, uid: int, force: bool = False): + redis = await self.cache.get(str(uid)) + if redis and not force: + return redis["buttons"] + data = await self.get_abyss_data(uid) + data.sort(key=lambda x: x.id, reverse=True) + abyss_data = [HistoryDataAbyss.from_data(i) for i in data] + buttons = [ + { + "name": self.get_season_data_name(abyss_data[idx]), + "value": f"get_abyss_history|{user_id}|{uid}|{value.id}", + } + for idx, value in enumerate(data) + ] + await self.cache.set(str(uid), {"buttons": buttons}) + return buttons + + async def gen_season_button( + self, + user_id: int, + uid: int, + page: int = 1, + ) -> List[List[InlineKeyboardButton]]: + """生成按钮""" + data = await self.get_session_button_data(user_id, uid) + if not data: + return [] + buttons = [ + InlineKeyboardButton( + value["name"], + callback_data=value["value"], + ) + for value in data + ] + all_buttons = [buttons[i : i + 3] for i in range(0, len(buttons), 3)] + send_buttons = all_buttons[(page - 1) * 5 : page * 5] + last_page = page - 1 if page > 1 else 0 + all_page = math.ceil(len(all_buttons) / 5) + next_page = page + 1 if page < all_page and all_page > 1 else 0 + last_button = [] + if last_page: + last_button.append( + InlineKeyboardButton( + "<< 上一页", + callback_data=f"get_abyss_history|{user_id}|{uid}|p_{last_page}", + ) + ) + if last_page or next_page: + last_button.append( + InlineKeyboardButton( + f"{page}/{all_page}", + callback_data=f"get_abyss_history|{user_id}|{uid}|empty_data", + ) + ) + if next_page: + last_button.append( + InlineKeyboardButton( + "下一页 >>", + callback_data=f"get_abyss_history|{user_id}|{uid}|p_{next_page}", + ) + ) + if last_button: + send_buttons.append(last_button) + return send_buttons + + @staticmethod + async def gen_floor_button( + data_id: int, + abyss_data: "HistoryDataAbyss", + user_id: int, + uid: int, + ) -> List[List[InlineKeyboardButton]]: + max_floors = len(abyss_data.abyss_data.floors) + bypass_floors = len([i for i in abyss_data.abyss_data.floors if i.is_fast]) + buttons = [ + InlineKeyboardButton( + f"第 {i} 层", + callback_data=f"get_abyss_history|{user_id}|{uid}|{data_id}|{i}", + ) + for i in range(bypass_floors + 1, max_floors + 1) + ] + send_buttons = [buttons[i : i + 4] for i in range(0, len(buttons), 4)] + all_buttons = [ + InlineKeyboardButton( + "<< 返回", + callback_data=f"get_abyss_history|{user_id}|{uid}|p_1", + ), + InlineKeyboardButton( + "总览", + callback_data=f"get_abyss_history|{user_id}|{uid}|{data_id}|total", + ), + InlineKeyboardButton( + "所有", + callback_data=f"get_abyss_history|{user_id}|{uid}|{data_id}|all", + ), + ] + send_buttons.append(all_buttons) + return send_buttons + + @handler.command("challenge_history", block=False) + @handler.message(filters.Regex(r"^混沌回忆历史数据"), block=False) + async def abyss_history_command_start(self, update: Update, context: CallbackContext) -> None: + user_id = await self.get_real_user_id(update) + message = update.effective_message + uid: int = await self.get_uid(user_id, context.args, message.reply_to_message) + self.log_user(update, logger.info, "查询混沌回忆历史数据 uid[%s]", uid) + + async with self.helper.genshin_or_public(user_id, uid=uid) as _: + await self.get_session_button_data(user_id, uid, force=True) + buttons = await self.gen_season_button(user_id, uid) + if not buttons: + await message.reply_text("还没有混沌回忆历史数据哦~") + return + await message.reply_text("请选择要查询的混沌回忆历史数据", reply_markup=InlineKeyboardMarkup(buttons)) + + async def get_abyss_history_page(self, update: "Update", user_id: int, uid: int, result: str): + """翻页处理""" + callback_query = update.callback_query + + self.log_user(update, logger.info, "切换混沌回忆历史数据页 page[%s]", result) + page = int(result.split("_")[1]) + async with self.helper.genshin_or_public(user_id) as _: + buttons = await self.gen_season_button(user_id, uid, page) + if not buttons: + await callback_query.answer("还没有混沌回忆历史数据哦~", show_alert=True) + await callback_query.edit_message_text("还没有混沌回忆历史数据哦~") + return + await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons)) + await callback_query.answer(f"已切换到第 {page} 页", show_alert=False) + + async def get_abyss_history_season(self, update: "Update", data_id: int): + """进入选择层数""" + callback_query = update.callback_query + user = callback_query.from_user + + self.log_user(update, logger.info, "切换混沌回忆历史数据到层数页 data_id[%s]", data_id) + data = await self.history_data_abyss.get_by_id(data_id) + if not data: + await callback_query.answer("数据不存在,请尝试重新发送命令~", show_alert=True) + await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~") + return + abyss_data = HistoryDataAbyss.from_data(data) + buttons = await self.gen_floor_button(data_id, abyss_data, user.id, data.user_id) + await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons)) + await callback_query.answer("已切换到层数页", show_alert=False) + + async def get_abyss_history_floor(self, update: "Update", data_id: int, detail: str): + """渲染层数数据""" + callback_query = update.callback_query + message = callback_query.message + + floor = 0 + total = False + if detail == "total": + floor = 0 + elif detail == "all": + total = True + else: + floor = int(detail) + data = await self.history_data_abyss.get_by_id(data_id) + if not data: + await callback_query.answer("数据不存在,请尝试重新发送命令", show_alert=True) + await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~") + return + abyss_data = HistoryDataAbyss.from_data(data) + + images = await self.get_rendered_pic(abyss_data.abyss_data, data.user_id, floor, total) + if images is None: + await callback_query.answer(f"还没有第 {floor} 层的挑战数据", show_alert=True) + return + await callback_query.answer("正在渲染图片中 请稍等 请不要重复点击按钮", show_alert=False) + + await message.reply_chat_action(ChatAction.UPLOAD_PHOTO) + + for group in ArkoWrapper(images).group(10): # 每 10 张图片分一个组 + await RenderGroupResult(results=group).reply_media_group( + message, allow_sending_without_reply=True, write_timeout=60 + ) + self.log_user(update, logger.info, "[bold]混沌回忆挑战数据[/bold]: 成功发送图片", extra={"markup": True}) + self.add_delete_message_job(message, delay=1) + + @handler.callback_query(pattern=r"^get_abyss_history\|", block=False) + async def get_abyss_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: + callback_query = update.callback_query + user = callback_query.from_user + + async def get_abyss_history_callback( + callback_query_data: str, + ) -> Tuple[str, str, int, int]: + _data = callback_query_data.split("|") + _user_id = int(_data[1]) + _uid = int(_data[2]) + _result = _data[3] + _detail = _data[4] if len(_data) > 4 else None + logger.debug( + "callback_query_data函数返回 detail[%s] result[%s] user_id[%s] uid[%s]", + _detail, + _result, + _user_id, + _uid, + ) + return _detail, _result, _user_id, _uid + + detail, result, user_id, uid = await get_abyss_history_callback(callback_query.data) + if user.id != user_id: + await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True) + return + if result == "empty_data": + await callback_query.answer(text="此按钮不可用", show_alert=True) + return + if result.startswith("p_"): + await self.get_abyss_history_page(update, user_id, uid, result) + return + data_id = int(result) + if detail: + await self.get_abyss_history_floor(update, data_id, detail) + return + await self.get_abyss_history_season(update, data_id) diff --git a/plugins/starrail/challenge_story.py b/plugins/starrail/challenge_story.py index c1ab06e..56e8790 100644 --- a/plugins/starrail/challenge_story.py +++ b/plugins/starrail/challenge_story.py @@ -1,22 +1,28 @@ """虚构叙事数据查询""" import asyncio +import math import re from functools import lru_cache from typing import Any, List, Optional, Tuple, Union, TYPE_CHECKING from arkowrapper import ArkoWrapper from pytz import timezone -from telegram import Message, Update +from telegram import Message, Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.constants import ChatAction, ParseMode -from telegram.ext import CallbackContext, filters +from telegram.ext import CallbackContext, filters, ContextTypes from core.dependence.assets import AssetsService from core.plugin import Plugin, handler from core.services.cookies.error import TooManyRequestPublicCookies +from core.services.history_data.models import HistoryDataChallengeStory +from core.services.history_data.services import HistoryDataChallengeStoryServices from core.services.template.models import RenderGroupResult, RenderResult from core.services.template.services import TemplateService +from gram_core.config import config +from gram_core.dependence.redisdb import RedisDB from plugins.tools.genshin import GenshinHelper +from utils.enkanetwork import RedisCache from utils.log import logger from utils.uid import mask_number @@ -28,8 +34,7 @@ except ImportError: if TYPE_CHECKING: from simnet import StarRailClient - from simnet.models.starrail.chronicle.challenge_story import StarRailChallengeStory - + from simnet.models.starrail.chronicle.challenge_story import StarRailChallengeStory, StarRailChallengeStoryGroup TZ = timezone("Asia/Shanghai") cmd_pattern = r"(?i)^/challenge_story(?:@[\w]+)?\s*((?:\d+)|(?:all))?\s*(pre)?" @@ -68,10 +73,14 @@ class ChallengeStoryPlugin(Plugin): template: TemplateService, helper: GenshinHelper, assets_service: AssetsService, + history_data_abyss: HistoryDataChallengeStoryServices, + redis: RedisDB, ): self.template_service = template self.helper = helper self.assets_service = assets_service + self.history_data_abyss = history_data_abyss + self.cache = RedisCache(redis.client, key="plugin:challenge_story:history") async def get_uid(self, user_id: int, args: List[str], reply: Optional[Message]) -> int: """通过消息获取 uid,优先级:args > reply > self""" @@ -149,7 +158,8 @@ class ChallengeStoryPlugin(Plugin): if total: reply_text = await message.reply_text("彦卿需要时间整理虚构叙事数据,还请耐心等待哦~") await message.reply_chat_action(ChatAction.TYPING) - images = await self.get_rendered_pic(client, uid, floor, total, previous) + abyss_data, season = await self.get_rendered_pic_data(client, uid, previous) + images = await self.get_rendered_pic(abyss_data, season, uid, floor, total) except TooManyRequestPublicCookies: reply_message = await message.reply_text("查询次数太多,请您稍后重试") if filters.ChatType.GROUPS.filter(message): @@ -199,8 +209,23 @@ class ChallengeStoryPlugin(Plugin): } return render_data + async def get_rendered_pic_data( + self, client: "StarRailClient", uid: int, previous: bool + ) -> Tuple["StarRailChallengeStory", "StarRailChallengeStoryGroup"]: + abyss_data = await client.get_starrail_challenge_story(uid, previous=previous, lang="zh-cn") + group = None + if abyss_data.has_data and abyss_data.groups: + group = abyss_data.groups[1] if previous else abyss_data.groups[0] + await self.save_abyss_data(uid, abyss_data, group) + return abyss_data, group + async def get_rendered_pic( - self, client: "StarRailClient", uid: int, floor: int, total: bool, previous: bool + self, + abyss_data: "StarRailChallengeStory", + season: "StarRailChallengeStoryGroup", + uid: int, + floor: int, + total: bool, ) -> Union[ Tuple[ Union[BaseException, Any], @@ -216,22 +241,20 @@ class ChallengeStoryPlugin(Plugin): 获取渲染后的图片 Args: - client (Client): 获取 genshin 数据的 client + abyss_data (StarRailChallengeStory): 虚构叙事数据 + season (StarRailChallengeStoryGroup): 虚构叙事组数据 uid (int): 需要查询的 uid floor (int): 层数 total (bool): 是否为总览 - previous (bool): 是否为上期 Returns: bytes格式的图片 """ - abyss_data = await client.get_starrail_challenge_story(uid, previous=previous, lang="zh-cn") if not abyss_data.has_data: raise AbyssUnlocked() - if not abyss_data.groups: + if not season: raise AbyssUnlocked() - season = abyss_data.groups[0] start_time = season.begin_time.datetime.astimezone(TZ).strftime("%m月%d日 %H:%M") end_time = season.end_time.datetime.astimezone(TZ).strftime("%m月%d日 %H:%M") total_stars = f"{abyss_data.total_stars}" @@ -260,6 +283,9 @@ class ChallengeStoryPlugin(Plugin): }, } if total: + overview = await self.template_service.render( + "starrail/abyss/overview.html", render_data, viewport={"width": 750, "height": 250} + ) def floor_task(floor_index: int): _abyss_data = self.get_floor_data(abyss_data, floor_index) @@ -287,7 +313,9 @@ class ChallengeStoryPlugin(Plugin): render_group_inputs = list(map(lambda x: x[1], sorted(render_inputs, key=lambda x: x[0]))) - return await asyncio.gather(*render_group_inputs) + render_group_outputs = await asyncio.gather(*render_group_inputs) + render_group_outputs.insert(0, overview) + return render_group_outputs if floor < 1: return [ @@ -309,3 +337,253 @@ class ChallengeStoryPlugin(Plugin): "starrail/abyss/floor_story.html", render_data, viewport={"width": 690, "height": 500} ) ] + + async def save_abyss_data( + self, uid: int, abyss_data: "StarRailChallengeStory", group: "StarRailChallengeStoryGroup" + ): + model = self.history_data_abyss.create(uid, abyss_data, group) + old_data = await self.history_data_abyss.get_by_user_id_data_id(uid, model.data_id) + exists = self.history_data_abyss.exists_data(model, old_data) + if not exists: + await self.history_data_abyss.add(model) + + async def get_abyss_data(self, uid: int): + return await self.history_data_abyss.get_by_user_id(uid) + + @staticmethod + def get_season_data_name(data: "HistoryDataChallengeStory"): + start_time = data.group.begin_time.datetime.astimezone(TZ) + time = start_time.strftime("%Y.%m.%d") + honor = "" + if data.story_data.total_stars == 12: + if data.story_data.total_battles == 4: + honor = "👑" + last_battles = data.story_data.floors[-1] + num_of_characters = max( + len(last_battles.node_1.avatars), + len(last_battles.node_2.avatars), + ) + if num_of_characters == 2: + honor = "双通" + elif num_of_characters == 1: + honor = "单通" + + return f"{time} {data.story_data.total_stars} ★ {honor}" + + async def get_session_button_data(self, user_id: int, uid: int, force: bool = False): + redis = await self.cache.get(str(uid)) + if redis and not force: + return redis["buttons"] + data = await self.get_abyss_data(uid) + data.sort(key=lambda x: x.id, reverse=True) + abyss_data = [HistoryDataChallengeStory.from_data(i) for i in data] + buttons = [ + { + "name": self.get_season_data_name(abyss_data[idx]), + "value": f"get_challenge_story_history|{user_id}|{uid}|{value.id}", + } + for idx, value in enumerate(data) + ] + await self.cache.set(str(uid), {"buttons": buttons}) + return buttons + + async def gen_season_button( + self, + user_id: int, + uid: int, + page: int = 1, + ) -> List[List[InlineKeyboardButton]]: + """生成按钮""" + data = await self.get_session_button_data(user_id, uid) + if not data: + return [] + buttons = [ + InlineKeyboardButton( + value["name"], + callback_data=value["value"], + ) + for value in data + ] + all_buttons = [buttons[i : i + 3] for i in range(0, len(buttons), 3)] + send_buttons = all_buttons[(page - 1) * 5 : page * 5] + last_page = page - 1 if page > 1 else 0 + all_page = math.ceil(len(all_buttons) / 5) + next_page = page + 1 if page < all_page and all_page > 1 else 0 + last_button = [] + if last_page: + last_button.append( + InlineKeyboardButton( + "<< 上一页", + callback_data=f"get_challenge_story_history|{user_id}|{uid}|p_{last_page}", + ) + ) + if last_page or next_page: + last_button.append( + InlineKeyboardButton( + f"{page}/{all_page}", + callback_data=f"get_challenge_story_history|{user_id}|{uid}|empty_data", + ) + ) + if next_page: + last_button.append( + InlineKeyboardButton( + "下一页 >>", + callback_data=f"get_challenge_story_history|{user_id}|{uid}|p_{next_page}", + ) + ) + if last_button: + send_buttons.append(last_button) + return send_buttons + + @staticmethod + async def gen_floor_button( + data_id: int, + abyss_data: "HistoryDataChallengeStory", + user_id: int, + uid: int, + ) -> List[List[InlineKeyboardButton]]: + max_floors = len(abyss_data.story_data.floors) + buttons = [ + InlineKeyboardButton( + f"第 {i} 层", + callback_data=f"get_challenge_story_history|{user_id}|{uid}|{data_id}|{i}", + ) + for i in range(1, max_floors + 1) + ] + send_buttons = [buttons[i : i + 4] for i in range(0, len(buttons), 4)] + all_buttons = [ + InlineKeyboardButton( + "<< 返回", + callback_data=f"get_challenge_story_history|{user_id}|{uid}|p_1", + ), + InlineKeyboardButton( + "总览", + callback_data=f"get_challenge_story_history|{user_id}|{uid}|{data_id}|total", + ), + InlineKeyboardButton( + "所有", + callback_data=f"get_challenge_story_history|{user_id}|{uid}|{data_id}|all", + ), + ] + send_buttons.append(all_buttons) + return send_buttons + + @handler.command("challenge_story_history", block=False) + @handler.message(filters.Regex(r"^虚构叙事历史数据"), block=False) + async def challenge_story_history_command_start(self, update: Update, context: CallbackContext) -> None: + user_id = await self.get_real_user_id(update) + message = update.effective_message + uid: int = await self.get_uid(user_id, context.args, message.reply_to_message) + self.log_user(update, logger.info, "查询虚构叙事历史数据 uid[%s]", uid) + + async with self.helper.genshin_or_public(user_id, uid=uid) as _: + await self.get_session_button_data(user_id, uid, force=True) + buttons = await self.gen_season_button(user_id, uid) + if not buttons: + await message.reply_text("还没有虚构叙事历史数据哦~") + return + await message.reply_text("请选择要查询的虚构叙事历史数据", reply_markup=InlineKeyboardMarkup(buttons)) + + async def get_challenge_story_history_page(self, update: "Update", user_id: int, uid: int, result: str): + """翻页处理""" + callback_query = update.callback_query + + self.log_user(update, logger.info, "切换虚构叙事历史数据页 page[%s]", result) + page = int(result.split("_")[1]) + async with self.helper.genshin_or_public(user_id) as _: + buttons = await self.gen_season_button(user_id, uid, page) + if not buttons: + await callback_query.answer("还没有虚构叙事历史数据哦~", show_alert=True) + await callback_query.edit_message_text("还没有虚构叙事历史数据哦~") + return + await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons)) + await callback_query.answer(f"已切换到第 {page} 页", show_alert=False) + + async def get_challenge_story_history_season(self, update: "Update", data_id: int): + """进入选择层数""" + callback_query = update.callback_query + user = callback_query.from_user + + self.log_user(update, logger.info, "切换虚构叙事历史数据到层数页 data_id[%s]", data_id) + data = await self.history_data_abyss.get_by_id(data_id) + if not data: + await callback_query.answer("数据不存在,请尝试重新发送命令~", show_alert=True) + await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~") + return + abyss_data = HistoryDataChallengeStory.from_data(data) + buttons = await self.gen_floor_button(data_id, abyss_data, user.id, data.user_id) + await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons)) + await callback_query.answer("已切换到层数页", show_alert=False) + + async def get_challenge_story_history_floor(self, update: "Update", data_id: int, detail: str): + """渲染层数数据""" + callback_query = update.callback_query + message = callback_query.message + + floor = 0 + total = False + if detail == "total": + floor = 0 + elif detail == "all": + total = True + else: + floor = int(detail) + data = await self.history_data_abyss.get_by_id(data_id) + if not data: + await callback_query.answer("数据不存在,请尝试重新发送命令", show_alert=True) + await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~") + return + abyss_data = HistoryDataChallengeStory.from_data(data) + + images = await self.get_rendered_pic(abyss_data.story_data, abyss_data.group, data.user_id, floor, total) + if images is None: + await callback_query.answer(f"还没有第 {floor} 层的挑战数据", show_alert=True) + return + await callback_query.answer("正在渲染图片中 请稍等 请不要重复点击按钮", show_alert=False) + + await message.reply_chat_action(ChatAction.UPLOAD_PHOTO) + + for group in ArkoWrapper(images).group(10): # 每 10 张图片分一个组 + await RenderGroupResult(results=group).reply_media_group( + message, allow_sending_without_reply=True, write_timeout=60 + ) + self.log_user(update, logger.info, "[bold]虚构叙事挑战数据[/bold]: 成功发送图片", extra={"markup": True}) + self.add_delete_message_job(message, delay=1) + + @handler.callback_query(pattern=r"^get_challenge_story_history\|", block=False) + async def get_challenge_story_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: + callback_query = update.callback_query + user = callback_query.from_user + + async def get_challenge_story_history_callback( + callback_query_data: str, + ) -> Tuple[str, str, int, int]: + _data = callback_query_data.split("|") + _user_id = int(_data[1]) + _uid = int(_data[2]) + _result = _data[3] + _detail = _data[4] if len(_data) > 4 else None + logger.debug( + "callback_query_data函数返回 detail[%s] result[%s] user_id[%s] uid[%s]", + _detail, + _result, + _user_id, + _uid, + ) + return _detail, _result, _user_id, _uid + + detail, result, user_id, uid = await get_challenge_story_history_callback(callback_query.data) + if user.id != user_id: + await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True) + return + if result == "empty_data": + await callback_query.answer(text="此按钮不可用", show_alert=True) + return + if result.startswith("p_"): + await self.get_challenge_story_history_page(update, user_id, uid, result) + return + data_id = int(result) + if detail: + await self.get_challenge_story_history_floor(update, data_id, detail) + return + await self.get_challenge_story_history_season(update, data_id)