Support ledger history

This commit is contained in:
omg-xtao 2024-05-24 17:40:09 +08:00 committed by GitHub
parent 3237adfb97
commit 2770c7a559
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 211 additions and 5 deletions

View File

@ -3,6 +3,7 @@ from typing import Dict
from pydantic import BaseModel
from simnet.models.genshin.chronicle.abyss import SpiralAbyss
from simnet.models.genshin.diary import Diary
from gram_core.services.history_data.models import HistoryData
@ -10,11 +11,13 @@ __all__ = (
"HistoryData",
"HistoryDataTypeEnum",
"HistoryDataAbyss",
"HistoryDataLedger",
)
class HistoryDataTypeEnum(int, enum.Enum):
ABYSS = 0 # 深境螺旋
LEDGER = 2 # 开拓月历
class HistoryDataAbyss(BaseModel):
@ -22,5 +25,13 @@ class HistoryDataAbyss(BaseModel):
character_data: Dict[int, int]
@classmethod
def from_data(cls, data: HistoryData):
def from_data(cls, data: HistoryData) -> "HistoryDataAbyss":
return cls.parse_obj(data.data)
class HistoryDataLedger(BaseModel):
diary_data: Diary
@classmethod
def from_data(cls, data: HistoryData) -> "HistoryDataLedger":
return cls.parse_obj(data.data)

View File

@ -3,8 +3,9 @@ from typing import Dict, List
from pytz import timezone
from simnet.models.genshin.chronicle.abyss import SpiralAbyss
from simnet.models.genshin.diary import Diary
from core.services.history_data.models import HistoryData, HistoryDataTypeEnum, HistoryDataAbyss
from core.services.history_data.models import HistoryData, HistoryDataTypeEnum, HistoryDataAbyss, HistoryDataLedger
from gram_core.base_service import BaseService
from gram_core.services.history_data.services import HistoryDataBaseServices
@ -46,3 +47,19 @@ class HistoryDataAbyssServices(BaseService, HistoryDataBaseServices):
type=HistoryDataAbyssServices.DATA_TYPE,
data=jsonlib.loads(json_data),
)
class HistoryDataLedgerServices(BaseService, HistoryDataBaseServices):
DATA_TYPE = HistoryDataTypeEnum.LEDGER.value
@staticmethod
def create(user_id: int, diary_data: Diary):
data = HistoryDataLedger(diary_data=diary_data)
json_data = data.json(by_alias=True, encoder=json_encoder)
return HistoryData(
user_id=user_id,
data_id=diary_data.data_id,
time_created=datetime.datetime.now(),
type=HistoryDataLedgerServices.DATA_TYPE,
data=jsonlib.loads(json_data),
)

View File

@ -62,6 +62,7 @@ class SetCommandPlugin(Plugin):
# Cookie 查询类
BotCommand("dailynote", "查询实时便笺"),
BotCommand("ledger", "查询当月旅行札记"),
BotCommand("ledger_history", "查询旅行札记历史记录"),
BotCommand("abyss", "查询深渊战绩"),
BotCommand("abyss_team", "查询深渊推荐配队"),
BotCommand("abyss_history", "查询深渊历史战绩"),

View File

@ -1,17 +1,24 @@
import math
import os
import re
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List, Tuple
from simnet.errors import DataNotPublic, BadRequest as SimnetBadRequest
from telegram import InlineKeyboardMarkup, InlineKeyboardButton
from telegram.constants import ChatAction
from telegram.ext import filters
from core.plugin import Plugin, handler
from core.services.cookies import CookiesService
from core.services.history_data.models import HistoryDataLedger
from core.services.history_data.services import HistoryDataLedgerServices
from core.services.template.models import RenderResult
from core.services.template.services import TemplateService
from gram_core.config import config
from gram_core.dependence.redisdb import RedisDB
from plugins.tools.genshin import GenshinHelper
from utils.enkanetwork import RedisCache
from utils.log import logger
from utils.uid import mask_number
@ -19,6 +26,7 @@ if TYPE_CHECKING:
from telegram import Update
from telegram.ext import ContextTypes
from simnet import GenshinClient
from simnet.models.genshin.diary import Diary
__all__ = ("LedgerPlugin",)
@ -31,14 +39,24 @@ class LedgerPlugin(Plugin):
helper: GenshinHelper,
cookies_service: CookiesService,
template_service: TemplateService,
history_data_ledger: HistoryDataLedgerServices,
redis: RedisDB,
):
self.template_service = template_service
self.cookies_service = cookies_service
self.current_dir = os.getcwd()
self.helper = helper
self.history_data_ledger = history_data_ledger
self.cache = RedisCache(redis.client, key="plugin:ledger:history")
self.kitsune = None
async def _start_get_ledger(self, client: "GenshinClient", month=None) -> RenderResult:
diary_info = await client.get_genshin_diary(client.player_id, month=month)
if month:
await self.save_ledger_data(client.player_id, diary_info)
return await self._start_get_ledger_render(client.player_id, diary_info)
async def _start_get_ledger_render(self, uid: int, diary_info: "Diary") -> RenderResult:
color = ["#73a9c6", "#d56565", "#70b2b4", "#bd9a5a", "#739970", "#7a6da7", "#597ea0"]
categories = [
{
@ -56,7 +74,7 @@ class LedgerPlugin(Plugin):
return f"{round(amount / 10000, 2)}w" if amount >= 10000 else amount
ledger_data = {
"uid": mask_number(client.player_id),
"uid": mask_number(uid),
"day": diary_info.month,
"current_primogems": format_amount(diary_info.month_data.current_primogems),
"gacha": int(diary_info.month_data.current_primogems / 160),
@ -101,7 +119,6 @@ class LedgerPlugin(Plugin):
if month not in allow_month and isinstance(month, int):
raise IndexError
month = now_time.month
except IndexError:
reply_message = await message.reply_text("仅可查询最新三月的数据,请重新输入")
if filters.ChatType.GROUPS.filter(message):
@ -128,3 +145,163 @@ class LedgerPlugin(Plugin):
raise exc
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
await render_result.reply_photo(message, filename=f"{client.player_id}.png", allow_sending_without_reply=True)
async def save_ledger_data(self, uid: int, ledger_data: "Diary"):
month = int(ledger_data.date.split("-")[1])
if month == ledger_data.month:
return
model = self.history_data_ledger.create(uid, ledger_data)
old_data = await self.history_data_ledger.get_by_user_id_data_id(uid, model.data_id)
if not old_data:
await self.history_data_ledger.add(model)
async def get_ledger_data(self, uid: int):
return await self.history_data_ledger.get_by_user_id(uid)
@staticmethod
def get_season_data_name(data: "HistoryDataLedger") -> str:
return f"{data.diary_data.data_id}"
async def get_session_button_data(self, user_id: int, uid: int, force: bool = False):
redis = await self.cache.get(str(uid))
if redis and not force:
return redis["buttons"]
data = await self.get_ledger_data(uid)
data.sort(key=lambda x: x.data_id, reverse=True)
abyss_data = [HistoryDataLedger.from_data(i) for i in data]
buttons = [
{
"name": LedgerPlugin.get_season_data_name(abyss_data[idx]),
"value": f"get_ledger_history|{user_id}|{uid}|{value.id}",
}
for idx, value in enumerate(data)
]
await self.cache.set(str(uid), {"buttons": buttons})
return buttons
async def gen_season_button(
self,
user_id: int,
uid: int,
page: int = 1,
) -> List[List[InlineKeyboardButton]]:
"""生成按钮"""
data = await self.get_session_button_data(user_id, uid)
if not data:
return []
buttons = [
InlineKeyboardButton(
value["name"],
callback_data=value["value"],
)
for value in data
]
all_buttons = [buttons[i : i + 3] for i in range(0, len(buttons), 3)]
send_buttons = all_buttons[(page - 1) * 5 : page * 5]
last_page = page - 1 if page > 1 else 0
all_page = math.ceil(len(all_buttons) / 5)
next_page = page + 1 if page < all_page and all_page > 1 else 0
last_button = []
if last_page:
last_button.append(
InlineKeyboardButton(
"<< 上一页",
callback_data=f"get_ledger_history|{user_id}|{uid}|p_{last_page}",
)
)
if last_page or next_page:
last_button.append(
InlineKeyboardButton(
f"{page}/{all_page}",
callback_data=f"get_ledger_history|{user_id}|{uid}|empty_data",
)
)
if next_page:
last_button.append(
InlineKeyboardButton(
"下一页 >>",
callback_data=f"get_ledger_history|{user_id}|{uid}|p_{next_page}",
)
)
if last_button:
send_buttons.append(last_button)
return send_buttons
@handler.command("ledger_history", block=False)
@handler.message(filters.Regex(r"^旅行札记历史数据"), block=False)
async def ledger_history_command_start(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
user_id = await self.get_real_user_id(update)
message = update.effective_message
self.log_user(update, logger.info, "查询旅行札记历史数据")
async with self.helper.genshin(user_id) as client:
await self.get_session_button_data(user_id, client.player_id, force=True)
buttons = await self.gen_season_button(user_id, client.player_id)
if not buttons:
await message.reply_text("还没有旅行札记历史数据哦~")
return
if isinstance(self.kitsune, str):
photo = self.kitsune
else:
photo = open("resources/img/kitsune.png", "rb")
reply_message = await message.reply_photo(
photo, "请选择要查询的旅行札记历史数据", reply_markup=InlineKeyboardMarkup(buttons)
)
if reply_message.photo:
self.kitsune = reply_message.photo[-1].file_id
async def get_ledger_history_page(self, update: "Update", user_id: int, result: str):
"""翻页处理"""
callback_query = update.callback_query
self.log_user(update, logger.info, "切换旅行札记历史数据页 page[%s]", result)
page = int(result.split("_")[1])
async with self.helper.genshin(user_id) as client:
buttons = await self.gen_season_button(user_id, client.player_id, page)
if not buttons:
await callback_query.answer("还没有旅行札记历史数据哦~", show_alert=True)
await callback_query.edit_message_text("还没有旅行札记历史数据哦~")
return
await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer(f"已切换到第 {page}", show_alert=False)
@handler.callback_query(pattern=r"^get_ledger_history\|", block=False)
async def get_ledger_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
callback_query = update.callback_query
message = callback_query.message
user = callback_query.from_user
async def get_ledger_history_callback(
callback_query_data: str,
) -> Tuple[str, int, int]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_uid = int(_data[2])
_result = _data[3]
logger.debug(
"callback_query_data函数返回 result[%s] user_id[%s] uid[%s]",
_result,
_user_id,
_uid,
)
return _result, _user_id, _uid
result, user_id, _ = await get_ledger_history_callback(callback_query.data)
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
if result == "empty_data":
await callback_query.answer(text="此按钮不可用", show_alert=True)
return
if result.startswith("p_"):
await self.get_ledger_history_page(update, user_id, result)
return
data_id = int(result)
data = await self.history_data_ledger.get_by_id(data_id)
if not data:
await callback_query.answer("数据不存在,请尝试重新发送命令", show_alert=True)
await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~")
return
await callback_query.answer("正在渲染图片中 请稍等 请不要重复点击按钮")
render = await self._start_get_ledger_render(user_id, HistoryDataLedger.from_data(data).diary_data)
await render.edit_media(message)