From 2770c7a5599a3b6f44cfa06df8d1b5a56c395d40 Mon Sep 17 00:00:00 2001 From: omg-xtao <100690902+omg-xtao@users.noreply.github.com> Date: Fri, 24 May 2024 17:40:09 +0800 Subject: [PATCH] :sparkles: Support ledger history --- core/services/history_data/models.py | 13 +- core/services/history_data/services.py | 19 ++- plugins/admin/set_command.py | 1 + plugins/genshin/ledger.py | 183 ++++++++++++++++++++++++- 4 files changed, 211 insertions(+), 5 deletions(-) diff --git a/core/services/history_data/models.py b/core/services/history_data/models.py index 1b543b78..c18e8b1d 100644 --- a/core/services/history_data/models.py +++ b/core/services/history_data/models.py @@ -3,6 +3,7 @@ from typing import Dict from pydantic import BaseModel from simnet.models.genshin.chronicle.abyss import SpiralAbyss +from simnet.models.genshin.diary import Diary from gram_core.services.history_data.models import HistoryData @@ -10,11 +11,13 @@ __all__ = ( "HistoryData", "HistoryDataTypeEnum", "HistoryDataAbyss", + "HistoryDataLedger", ) class HistoryDataTypeEnum(int, enum.Enum): ABYSS = 0 # 深境螺旋 + LEDGER = 2 # 开拓月历 class HistoryDataAbyss(BaseModel): @@ -22,5 +25,13 @@ class HistoryDataAbyss(BaseModel): character_data: Dict[int, int] @classmethod - def from_data(cls, data: HistoryData): + def from_data(cls, data: HistoryData) -> "HistoryDataAbyss": + return cls.parse_obj(data.data) + + +class HistoryDataLedger(BaseModel): + diary_data: Diary + + @classmethod + def from_data(cls, data: HistoryData) -> "HistoryDataLedger": return cls.parse_obj(data.data) diff --git a/core/services/history_data/services.py b/core/services/history_data/services.py index 82f3ff88..daee952b 100644 --- a/core/services/history_data/services.py +++ b/core/services/history_data/services.py @@ -3,8 +3,9 @@ from typing import Dict, List from pytz import timezone from simnet.models.genshin.chronicle.abyss import SpiralAbyss +from simnet.models.genshin.diary import Diary -from core.services.history_data.models import HistoryData, HistoryDataTypeEnum, HistoryDataAbyss +from core.services.history_data.models import HistoryData, HistoryDataTypeEnum, HistoryDataAbyss, HistoryDataLedger from gram_core.base_service import BaseService from gram_core.services.history_data.services import HistoryDataBaseServices @@ -46,3 +47,19 @@ class HistoryDataAbyssServices(BaseService, HistoryDataBaseServices): type=HistoryDataAbyssServices.DATA_TYPE, data=jsonlib.loads(json_data), ) + + +class HistoryDataLedgerServices(BaseService, HistoryDataBaseServices): + DATA_TYPE = HistoryDataTypeEnum.LEDGER.value + + @staticmethod + def create(user_id: int, diary_data: Diary): + data = HistoryDataLedger(diary_data=diary_data) + json_data = data.json(by_alias=True, encoder=json_encoder) + return HistoryData( + user_id=user_id, + data_id=diary_data.data_id, + time_created=datetime.datetime.now(), + type=HistoryDataLedgerServices.DATA_TYPE, + data=jsonlib.loads(json_data), + ) diff --git a/plugins/admin/set_command.py b/plugins/admin/set_command.py index 5a5d2553..e2c30ba3 100644 --- a/plugins/admin/set_command.py +++ b/plugins/admin/set_command.py @@ -62,6 +62,7 @@ class SetCommandPlugin(Plugin): # Cookie 查询类 BotCommand("dailynote", "查询实时便笺"), BotCommand("ledger", "查询当月旅行札记"), + BotCommand("ledger_history", "查询旅行札记历史记录"), BotCommand("abyss", "查询深渊战绩"), BotCommand("abyss_team", "查询深渊推荐配队"), BotCommand("abyss_history", "查询深渊历史战绩"), diff --git a/plugins/genshin/ledger.py b/plugins/genshin/ledger.py index 7cd7e891..6b3cd3f1 100644 --- a/plugins/genshin/ledger.py +++ b/plugins/genshin/ledger.py @@ -1,17 +1,24 @@ +import math import os import re from datetime import datetime, timedelta -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, List, Tuple from simnet.errors import DataNotPublic, BadRequest as SimnetBadRequest +from telegram import InlineKeyboardMarkup, InlineKeyboardButton from telegram.constants import ChatAction from telegram.ext import filters from core.plugin import Plugin, handler from core.services.cookies import CookiesService +from core.services.history_data.models import HistoryDataLedger +from core.services.history_data.services import HistoryDataLedgerServices from core.services.template.models import RenderResult from core.services.template.services import TemplateService +from gram_core.config import config +from gram_core.dependence.redisdb import RedisDB from plugins.tools.genshin import GenshinHelper +from utils.enkanetwork import RedisCache from utils.log import logger from utils.uid import mask_number @@ -19,6 +26,7 @@ if TYPE_CHECKING: from telegram import Update from telegram.ext import ContextTypes from simnet import GenshinClient + from simnet.models.genshin.diary import Diary __all__ = ("LedgerPlugin",) @@ -31,14 +39,24 @@ class LedgerPlugin(Plugin): helper: GenshinHelper, cookies_service: CookiesService, template_service: TemplateService, + history_data_ledger: HistoryDataLedgerServices, + redis: RedisDB, ): self.template_service = template_service self.cookies_service = cookies_service self.current_dir = os.getcwd() self.helper = helper + self.history_data_ledger = history_data_ledger + self.cache = RedisCache(redis.client, key="plugin:ledger:history") + self.kitsune = None async def _start_get_ledger(self, client: "GenshinClient", month=None) -> RenderResult: diary_info = await client.get_genshin_diary(client.player_id, month=month) + if month: + await self.save_ledger_data(client.player_id, diary_info) + return await self._start_get_ledger_render(client.player_id, diary_info) + + async def _start_get_ledger_render(self, uid: int, diary_info: "Diary") -> RenderResult: color = ["#73a9c6", "#d56565", "#70b2b4", "#bd9a5a", "#739970", "#7a6da7", "#597ea0"] categories = [ { @@ -56,7 +74,7 @@ class LedgerPlugin(Plugin): return f"{round(amount / 10000, 2)}w" if amount >= 10000 else amount ledger_data = { - "uid": mask_number(client.player_id), + "uid": mask_number(uid), "day": diary_info.month, "current_primogems": format_amount(diary_info.month_data.current_primogems), "gacha": int(diary_info.month_data.current_primogems / 160), @@ -101,7 +119,6 @@ class LedgerPlugin(Plugin): if month not in allow_month and isinstance(month, int): raise IndexError - month = now_time.month except IndexError: reply_message = await message.reply_text("仅可查询最新三月的数据,请重新输入") if filters.ChatType.GROUPS.filter(message): @@ -128,3 +145,163 @@ class LedgerPlugin(Plugin): raise exc await message.reply_chat_action(ChatAction.UPLOAD_PHOTO) await render_result.reply_photo(message, filename=f"{client.player_id}.png", allow_sending_without_reply=True) + + async def save_ledger_data(self, uid: int, ledger_data: "Diary"): + month = int(ledger_data.date.split("-")[1]) + if month == ledger_data.month: + return + model = self.history_data_ledger.create(uid, ledger_data) + old_data = await self.history_data_ledger.get_by_user_id_data_id(uid, model.data_id) + if not old_data: + await self.history_data_ledger.add(model) + + async def get_ledger_data(self, uid: int): + return await self.history_data_ledger.get_by_user_id(uid) + + @staticmethod + def get_season_data_name(data: "HistoryDataLedger") -> str: + return f"{data.diary_data.data_id}" + + async def get_session_button_data(self, user_id: int, uid: int, force: bool = False): + redis = await self.cache.get(str(uid)) + if redis and not force: + return redis["buttons"] + data = await self.get_ledger_data(uid) + data.sort(key=lambda x: x.data_id, reverse=True) + abyss_data = [HistoryDataLedger.from_data(i) for i in data] + buttons = [ + { + "name": LedgerPlugin.get_season_data_name(abyss_data[idx]), + "value": f"get_ledger_history|{user_id}|{uid}|{value.id}", + } + for idx, value in enumerate(data) + ] + await self.cache.set(str(uid), {"buttons": buttons}) + return buttons + + async def gen_season_button( + self, + user_id: int, + uid: int, + page: int = 1, + ) -> List[List[InlineKeyboardButton]]: + """生成按钮""" + data = await self.get_session_button_data(user_id, uid) + if not data: + return [] + buttons = [ + InlineKeyboardButton( + value["name"], + callback_data=value["value"], + ) + for value in data + ] + all_buttons = [buttons[i : i + 3] for i in range(0, len(buttons), 3)] + send_buttons = all_buttons[(page - 1) * 5 : page * 5] + last_page = page - 1 if page > 1 else 0 + all_page = math.ceil(len(all_buttons) / 5) + next_page = page + 1 if page < all_page and all_page > 1 else 0 + last_button = [] + if last_page: + last_button.append( + InlineKeyboardButton( + "<< 上一页", + callback_data=f"get_ledger_history|{user_id}|{uid}|p_{last_page}", + ) + ) + if last_page or next_page: + last_button.append( + InlineKeyboardButton( + f"{page}/{all_page}", + callback_data=f"get_ledger_history|{user_id}|{uid}|empty_data", + ) + ) + if next_page: + last_button.append( + InlineKeyboardButton( + "下一页 >>", + callback_data=f"get_ledger_history|{user_id}|{uid}|p_{next_page}", + ) + ) + if last_button: + send_buttons.append(last_button) + return send_buttons + + @handler.command("ledger_history", block=False) + @handler.message(filters.Regex(r"^旅行札记历史数据"), block=False) + async def ledger_history_command_start(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: + user_id = await self.get_real_user_id(update) + message = update.effective_message + self.log_user(update, logger.info, "查询旅行札记历史数据") + + async with self.helper.genshin(user_id) as client: + await self.get_session_button_data(user_id, client.player_id, force=True) + buttons = await self.gen_season_button(user_id, client.player_id) + if not buttons: + await message.reply_text("还没有旅行札记历史数据哦~") + return + if isinstance(self.kitsune, str): + photo = self.kitsune + else: + photo = open("resources/img/kitsune.png", "rb") + reply_message = await message.reply_photo( + photo, "请选择要查询的旅行札记历史数据", reply_markup=InlineKeyboardMarkup(buttons) + ) + if reply_message.photo: + self.kitsune = reply_message.photo[-1].file_id + + async def get_ledger_history_page(self, update: "Update", user_id: int, result: str): + """翻页处理""" + callback_query = update.callback_query + + self.log_user(update, logger.info, "切换旅行札记历史数据页 page[%s]", result) + page = int(result.split("_")[1]) + async with self.helper.genshin(user_id) as client: + buttons = await self.gen_season_button(user_id, client.player_id, page) + if not buttons: + await callback_query.answer("还没有旅行札记历史数据哦~", show_alert=True) + await callback_query.edit_message_text("还没有旅行札记历史数据哦~") + return + await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons)) + await callback_query.answer(f"已切换到第 {page} 页", show_alert=False) + + @handler.callback_query(pattern=r"^get_ledger_history\|", block=False) + async def get_ledger_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: + callback_query = update.callback_query + message = callback_query.message + user = callback_query.from_user + + async def get_ledger_history_callback( + callback_query_data: str, + ) -> Tuple[str, int, int]: + _data = callback_query_data.split("|") + _user_id = int(_data[1]) + _uid = int(_data[2]) + _result = _data[3] + logger.debug( + "callback_query_data函数返回 result[%s] user_id[%s] uid[%s]", + _result, + _user_id, + _uid, + ) + return _result, _user_id, _uid + + result, user_id, _ = await get_ledger_history_callback(callback_query.data) + if user.id != user_id: + await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True) + return + if result == "empty_data": + await callback_query.answer(text="此按钮不可用", show_alert=True) + return + if result.startswith("p_"): + await self.get_ledger_history_page(update, user_id, result) + return + data_id = int(result) + data = await self.history_data_ledger.get_by_id(data_id) + if not data: + await callback_query.answer("数据不存在,请尝试重新发送命令", show_alert=True) + await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~") + return + await callback_query.answer("正在渲染图片中 请稍等 请不要重复点击按钮") + render = await self._start_get_ledger_render(user_id, HistoryDataLedger.from_data(data).diary_data) + await render.edit_media(message)