import math import os import re from datetime import datetime, timedelta from typing import TYPE_CHECKING, List, Tuple, Optional from simnet.errors import BadRequest as SimnetBadRequest, DataNotPublic from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.constants import ChatAction from telegram.ext import filters, CallbackContext, ContextTypes from core.plugin import Plugin, handler from import CookiesService from import HistoryDataLedger from import HistoryDataLedgerServices from import RenderResult from import TemplateService from gram_core.config import config from gram_core.dependence.redisdb import RedisDB from gram_core.plugin.methods.inline_use_data import IInlineUseData from import GenshinHelper from utils.enkanetwork import RedisCache from utils.log import logger from utils.uid import mask_number if TYPE_CHECKING: from simnet import ZZZClient from simnet.models.zzz.diary import ZZZDiary __all__ = ("LedgerPlugin",) class LedgerPlugin(Plugin): """绳网月报查询""" def __init__( self, helper: GenshinHelper, cookies_service: CookiesService, template_service: TemplateService, history_data_ledger: HistoryDataLedgerServices, redis: RedisDB, ): self.template_service = template_service self.cookies_service = cookies_service self.current_dir = os.getcwd() self.helper = helper self.history_data_ledger = history_data_ledger self.cache = RedisCache(redis.client, key="plugin:ledger:history") self.kitsune = None async def _start_get_ledger(self, client: "ZZZClient", year, month) -> RenderResult: req_month = f"{year}0{month}" if month < 10 else f"{year}{month}" diary_info = await client.get_zzz_diary(client.player_id, month=req_month) await self.save_ledger_data(self.history_data_ledger, client.player_id, diary_info) return await self._start_get_ledger_render(client.player_id, diary_info) async def _start_get_ledger_render(self, uid: int, diary_info: "ZZZDiary") -> RenderResult: color = ["#73a9c6", "#d56565", "#70b2b4", "#bd9a5a", "#739970", "#7a6da7", "#597ea0"] categories = [ { "name":, "color": color[idx % len(color)], "amount": i.amount, "percentage": i.percentage, } for idx, i in enumerate(diary_info.month_data.categories) ] color = [i["color"] for i in categories] def format_amount(amount: int) -> str: return f"{round(amount / 10000, 2)}w" if amount >= 10000 else amount _amount_map = { i.amount for i in diary_info.month_data.list} current_hcoin = _amount_map.get("PolychromesData", 0) current_rails_pass = _amount_map.get("MatserTapeData", 0) current_boo_pass = _amount_map.get("BooponsData", 0) ledger_data = { "uid": mask_number(uid), "day": diary_info.month, "current_hcoin": format_amount(current_hcoin), "gacha": int(current_hcoin / 160), "current_rails_pass": format_amount(current_rails_pass), "current_boo_pass": format_amount(current_boo_pass), "categories": categories, "color": color, "nickname": diary_info.role_info.nickname, "avatar": diary_info.role_info.avatar, } render_result = await self.template_service.render( "zzz/ledger/ledger.html", ledger_data, {"width": 640, "height": 610} ) return render_result @handler.command(command="ledger", cookie=True, block=False) @handler.message(filters=filters.Regex("^绳网月报查询(.*)"), block=False) async def command_start(self, update: Update, context: CallbackContext) -> None: user_id = await self.get_real_user_id(update) message = update.effective_message uid, offset = self.get_real_uid_or_offset(update) now = now_time = (now - timedelta(days=1)) if == 1 and now.hour <= 4 else now month = now_time.month try: args = self.get_args(context) if len(args) >= 1: month = args[0].replace("月", "") if re_data := re.findall(r"\d+", str(month)): month = int(re_data[0]) else: num_dict = {"一": 1, "二": 2, "三": 3, "四": 4, "五": 5, "六": 6, "七": 7, "八": 8, "九": 9, "十": 10} month = sum(num_dict.get(i, 0) for i in str(month)) # check right allow_month_year = {now_time.month: now_time.year} last_month = now_time.replace(day=1) - timedelta(days=1) allow_month_year[last_month.month] = last_month.year last_month = last_month.replace(day=1) - timedelta(days=1) allow_month_year[last_month.month] = last_month.year if (month not in allow_month_year) or (not isinstance(month, int)): raise IndexError year = allow_month_year[month] except IndexError: reply_message = await message.reply_text("仅可查询最新三月的数据,请重新输入") if filters.ChatType.GROUPS.filter(message): self.add_delete_message_job(reply_message, delay=30) self.add_delete_message_job(message, delay=30) return self.log_user(update,, "查询绳网月报") await message.reply_chat_action(ChatAction.TYPING) try: async with self.helper.genshin(user_id, player_id=uid, offset=offset) as client: render_result = await self._start_get_ledger(client, year, month) except DataNotPublic: reply_message = await message.reply_text( "查询失败惹,可能是绳网月报功能被禁用了?请先通过米游社或者 hoyolab 获取一次绳网月报后重试。" ) if filters.ChatType.GROUPS.filter(message): self.add_delete_message_job(reply_message, delay=30) self.add_delete_message_job(message, delay=30) return except SimnetBadRequest as exc: if exc.retcode == -120: await message.reply_text("当前角色等级不足,暂时无法获取信息") return raise exc await message.reply_chat_action(ChatAction.UPLOAD_PHOTO) await render_result.reply_photo(message, filename=f"{client.player_id}.png") @staticmethod async def save_ledger_data( history_data_ledger: "HistoryDataLedgerServices", uid: int, ledger_data: "ZZZDiary" ) -> bool: if int(ledger_data.current_month) == ledger_data.month: return False model = history_data_ledger.create(uid, ledger_data) old_data = await history_data_ledger.get_by_user_id_data_id(uid, model.data_id) if not old_data: await history_data_ledger.add(model) return True return False async def get_ledger_data(self, uid: int): return await self.history_data_ledger.get_by_user_id(uid) @staticmethod def get_season_data_name(data: "HistoryDataLedger") -> str: return f"{data.diary_data.data_id}" async def get_session_button_data(self, user_id: int, uid: int, force: bool = False): redis = await self.cache.get(str(uid)) if redis and not force: return redis["buttons"] data = await self.get_ledger_data(uid) data.sort(key=lambda x: x.data_id, reverse=True) abyss_data = [HistoryDataLedger.from_data(i) for i in data] buttons = [ { "name": LedgerPlugin.get_season_data_name(abyss_data[idx]), "value": f"get_ledger_history|{user_id}|{uid}|{}", } for idx, value in enumerate(data) ] await self.cache.set(str(uid), {"buttons": buttons}) return buttons async def gen_season_button( self, user_id: int, uid: int, page: int = 1, ) -> List[List[InlineKeyboardButton]]: """生成按钮""" data = await self.get_session_button_data(user_id, uid) if not data: return [] buttons = [ InlineKeyboardButton( value["name"], callback_data=value["value"], ) for value in data ] all_buttons = [buttons[i : i + 3] for i in range(0, len(buttons), 3)] send_buttons = all_buttons[(page - 1) * 5 : page * 5] last_page = page - 1 if page > 1 else 0 all_page = math.ceil(len(all_buttons) / 5) next_page = page + 1 if page < all_page and all_page > 1 else 0 last_button = [] if last_page: last_button.append( InlineKeyboardButton( "<< 上一页", callback_data=f"get_ledger_history|{user_id}|{uid}|p_{last_page}", ) ) if last_page or next_page: last_button.append( InlineKeyboardButton( f"{page}/{all_page}", callback_data=f"get_ledger_history|{user_id}|{uid}|empty_data", ) ) if next_page: last_button.append( InlineKeyboardButton( "下一页 >>", callback_data=f"get_ledger_history|{user_id}|{uid}|p_{next_page}", ) ) if last_button: send_buttons.append(last_button) return send_buttons @handler.command("ledger_history", cookie=True, block=False) @handler.message(filters.Regex(r"^绳网月报历史数据"), block=False) async def ledger_history_command_start(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: user_id = await self.get_real_user_id(update) message = update.effective_message uid, offset = self.get_real_uid_or_offset(update) self.log_user(update,, "查询绳网月报历史数据") async with self.helper.genshin(user_id, player_id=uid, offset=offset) as client: await self.get_session_button_data(user_id, client.player_id, force=True) buttons = await self.gen_season_button(user_id, client.player_id) if not buttons: await message.reply_text("还没有绳网月报历史数据哦~") return if isinstance(self.kitsune, str): photo = self.kitsune else: photo = open("resources/img/aaa.jpg", "rb") reply_message = await message.reply_photo( photo, "请选择要查询的绳网月报历史数据", reply_markup=InlineKeyboardMarkup(buttons) ) if self.kitsune =[-1].file_id async def get_ledger_history_page(self, update: "Update", user_id: int, uid: int, result: str): """翻页处理""" callback_query = update.callback_query self.log_user(update,, "切换绳网月报历史数据页 page[%s]", result) page = int(result.split("_")[1]) async with self.helper.genshin(user_id, player_id=uid) as client: buttons = await self.gen_season_button(user_id, client.player_id, page) if not buttons: await callback_query.answer("还没有绳网月报历史数据哦~", show_alert=True) await callback_query.edit_message_text("还没有绳网月报历史数据哦~") return await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons)) await callback_query.answer(f"已切换到第 {page} 页", show_alert=False) @handler.callback_query(pattern=r"^get_ledger_history\|", block=False) async def get_ledger_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: callback_query = update.callback_query message = callback_query.message user = callback_query.from_user async def get_ledger_history_callback( callback_query_data: str, ) -> Tuple[str, int, int]: _data = callback_query_data.split("|") _user_id = int(_data[1]) _uid = int(_data[2]) _result = _data[3] logger.debug( "callback_query_data函数返回 result[%s] user_id[%s] uid[%s]", _result, _user_id, _uid, ) return _result, _user_id, _uid result, user_id, uid = await get_ledger_history_callback( if != user_id: await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True) return if result == "empty_data": await callback_query.answer(text="此按钮不可用", show_alert=True) return if result.startswith("p_"): await self.get_ledger_history_page(update, user_id, uid, result) return data_id = int(result) data = await self.history_data_ledger.get_by_id(data_id) if not data: await callback_query.answer("数据不存在,请尝试重新发送命令", show_alert=True) await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~") return await callback_query.answer("正在渲染图片中 请稍等 请不要重复点击按钮") render = await self._start_get_ledger_render(user_id, HistoryDataLedger.from_data(data).diary_data) await render.edit_media(message) async def ledger_use_by_inline(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"): callback_query = update.callback_query user = update.effective_user user_id = uid = IInlineUseData.get_uid_from_context(context) self.log_user(update,, "查询绳网月报") now = now_time = (now - timedelta(days=1)) if == 1 and now.hour <= 4 else now year, month = now_time.year, now_time.month try: async with self.helper.genshin(user_id, player_id=uid) as client: render_result = await self._start_get_ledger(client, year, month) except DataNotPublic: await callback_query.answer( "查询失败惹,可能是绳网月报功能被禁用了?请先通过米游社或者 hoyolab 获取一次绳网月报后重试。", show_alert=True, ) return except SimnetBadRequest as exc: if exc.ret_code == -120: await callback_query.answer( "当前角色等级不足,暂时无法获取信息", show_alert=True, ) return raise exc await render_result.edit_inline_media(callback_query) async def get_inline_use_data(self) -> List[Optional[IInlineUseData]]: return [ IInlineUseData( text="当月绳网月报", hash="ledger", callback=self.ledger_use_by_inline, cookie=True, player=True, ) ]