MibooGram/plugins/zzz/ledger.py

360 lines
15 KiB
Python
Raw Normal View History

2024-09-25 15:15:46 +00:00
import math
import os
import re
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, List, Tuple, Optional
from simnet.errors import BadRequest as SimnetBadRequest, DataNotPublic
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction
from telegram.ext import filters, CallbackContext, ContextTypes
from core.plugin import Plugin, handler
from core.services.cookies import CookiesService
from core.services.history_data.models import HistoryDataLedger
from core.services.history_data.services import HistoryDataLedgerServices
from core.services.template.models import RenderResult
from core.services.template.services import TemplateService
from gram_core.config import config
from gram_core.dependence.redisdb import RedisDB
from gram_core.plugin.methods.inline_use_data import IInlineUseData
from plugins.tools.genshin import GenshinHelper
from utils.enkanetwork import RedisCache
from utils.log import logger
from utils.uid import mask_number
if TYPE_CHECKING:
from simnet import ZZZClient
from simnet.models.zzz.diary import ZZZDiary
__all__ = ("LedgerPlugin",)
class LedgerPlugin(Plugin):
"""绳网月报查询"""
def __init__(
self,
helper: GenshinHelper,
cookies_service: CookiesService,
template_service: TemplateService,
history_data_ledger: HistoryDataLedgerServices,
redis: RedisDB,
):
self.template_service = template_service
self.cookies_service = cookies_service
self.current_dir = os.getcwd()
self.helper = helper
self.history_data_ledger = history_data_ledger
self.cache = RedisCache(redis.client, key="plugin:ledger:history")
self.kitsune = None
async def _start_get_ledger(self, client: "ZZZClient", year, month) -> RenderResult:
req_month = f"{year}0{month}" if month < 10 else f"{year}{month}"
diary_info = await client.get_zzz_diary(client.player_id, month=req_month)
await self.save_ledger_data(self.history_data_ledger, client.player_id, diary_info)
return await self._start_get_ledger_render(client.player_id, diary_info)
async def _start_get_ledger_render(self, uid: int, diary_info: "ZZZDiary") -> RenderResult:
color = ["#73a9c6", "#d56565", "#70b2b4", "#bd9a5a", "#739970", "#7a6da7", "#597ea0"]
categories = [
{
"name": i.name,
"color": color[idx % len(color)],
"amount": i.amount,
"percentage": i.percentage,
}
for idx, i in enumerate(diary_info.month_data.categories)
]
color = [i["color"] for i in categories]
def format_amount(amount: int) -> str:
return f"{round(amount / 10000, 2)}w" if amount >= 10000 else amount
_amount_map = {i.id: i.amount for i in diary_info.month_data.list}
current_hcoin = _amount_map.get("PolychromesData", 0)
current_rails_pass = _amount_map.get("MatserTapeData", 0)
current_boo_pass = _amount_map.get("BooponsData", 0)
ledger_data = {
"uid": mask_number(uid),
"day": diary_info.month,
"current_hcoin": format_amount(current_hcoin),
"gacha": int(current_hcoin / 160),
"current_rails_pass": format_amount(current_rails_pass),
"current_boo_pass": format_amount(current_boo_pass),
"categories": categories,
"color": color,
"nickname": diary_info.role_info.nickname,
"avatar": diary_info.role_info.avatar,
}
render_result = await self.template_service.render(
"zzz/ledger/ledger.html", ledger_data, {"width": 640, "height": 610}
)
return render_result
@handler.command(command="ledger", cookie=True, block=False)
@handler.message(filters=filters.Regex("^绳网月报查询(.*)"), block=False)
async def command_start(self, update: Update, context: CallbackContext) -> None:
user_id = await self.get_real_user_id(update)
message = update.effective_message
uid, offset = self.get_real_uid_or_offset(update)
now = datetime.now()
now_time = (now - timedelta(days=1)) if now.day == 1 and now.hour <= 4 else now
month = now_time.month
try:
args = self.get_args(context)
if len(args) >= 1:
month = args[0].replace("", "")
if re_data := re.findall(r"\d+", str(month)):
month = int(re_data[0])
else:
num_dict = {"": 1, "": 2, "": 3, "": 4, "": 5, "": 6, "": 7, "": 8, "": 9, "": 10}
month = sum(num_dict.get(i, 0) for i in str(month))
# check right
allow_month_year = {now_time.month: now_time.year}
last_month = now_time.replace(day=1) - timedelta(days=1)
allow_month_year[last_month.month] = last_month.year
last_month = last_month.replace(day=1) - timedelta(days=1)
allow_month_year[last_month.month] = last_month.year
if (month not in allow_month_year) or (not isinstance(month, int)):
raise IndexError
year = allow_month_year[month]
except IndexError:
reply_message = await message.reply_text("仅可查询最新三月的数据,请重新输入")
if filters.ChatType.GROUPS.filter(message):
self.add_delete_message_job(reply_message, delay=30)
self.add_delete_message_job(message, delay=30)
return
self.log_user(update, logger.info, "查询绳网月报")
await message.reply_chat_action(ChatAction.TYPING)
try:
async with self.helper.genshin(user_id, player_id=uid, offset=offset) as client:
render_result = await self._start_get_ledger(client, year, month)
except DataNotPublic:
reply_message = await message.reply_text(
"查询失败惹,可能是绳网月报功能被禁用了?请先通过米游社或者 hoyolab 获取一次绳网月报后重试。"
)
if filters.ChatType.GROUPS.filter(message):
self.add_delete_message_job(reply_message, delay=30)
self.add_delete_message_job(message, delay=30)
return
except SimnetBadRequest as exc:
if exc.retcode == -120:
await message.reply_text("当前角色等级不足,暂时无法获取信息")
return
raise exc
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
await render_result.reply_photo(message, filename=f"{client.player_id}.png")
@staticmethod
async def save_ledger_data(
history_data_ledger: "HistoryDataLedgerServices", uid: int, ledger_data: "ZZZDiary"
) -> bool:
if int(ledger_data.current_month) == ledger_data.month:
return False
model = history_data_ledger.create(uid, ledger_data)
old_data = await history_data_ledger.get_by_user_id_data_id(uid, model.data_id)
if not old_data:
await history_data_ledger.add(model)
return True
return False
async def get_ledger_data(self, uid: int):
return await self.history_data_ledger.get_by_user_id(uid)
@staticmethod
def get_season_data_name(data: "HistoryDataLedger") -> str:
return f"{data.diary_data.data_id}"
async def get_session_button_data(self, user_id: int, uid: int, force: bool = False):
redis = await self.cache.get(str(uid))
if redis and not force:
return redis["buttons"]
data = await self.get_ledger_data(uid)
data.sort(key=lambda x: x.data_id, reverse=True)
abyss_data = [HistoryDataLedger.from_data(i) for i in data]
buttons = [
{
"name": LedgerPlugin.get_season_data_name(abyss_data[idx]),
"value": f"get_ledger_history|{user_id}|{uid}|{value.id}",
}
for idx, value in enumerate(data)
]
await self.cache.set(str(uid), {"buttons": buttons})
return buttons
async def gen_season_button(
self,
user_id: int,
uid: int,
page: int = 1,
) -> List[List[InlineKeyboardButton]]:
"""生成按钮"""
data = await self.get_session_button_data(user_id, uid)
if not data:
return []
buttons = [
InlineKeyboardButton(
value["name"],
callback_data=value["value"],
)
for value in data
]
all_buttons = [buttons[i : i + 3] for i in range(0, len(buttons), 3)]
send_buttons = all_buttons[(page - 1) * 5 : page * 5]
last_page = page - 1 if page > 1 else 0
all_page = math.ceil(len(all_buttons) / 5)
next_page = page + 1 if page < all_page and all_page > 1 else 0
last_button = []
if last_page:
last_button.append(
InlineKeyboardButton(
"<< 上一页",
callback_data=f"get_ledger_history|{user_id}|{uid}|p_{last_page}",
)
)
if last_page or next_page:
last_button.append(
InlineKeyboardButton(
f"{page}/{all_page}",
callback_data=f"get_ledger_history|{user_id}|{uid}|empty_data",
)
)
if next_page:
last_button.append(
InlineKeyboardButton(
"下一页 >>",
callback_data=f"get_ledger_history|{user_id}|{uid}|p_{next_page}",
)
)
if last_button:
send_buttons.append(last_button)
return send_buttons
@handler.command("ledger_history", cookie=True, block=False)
@handler.message(filters.Regex(r"^绳网月报历史数据"), block=False)
async def ledger_history_command_start(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
user_id = await self.get_real_user_id(update)
message = update.effective_message
uid, offset = self.get_real_uid_or_offset(update)
self.log_user(update, logger.info, "查询绳网月报历史数据")
async with self.helper.genshin(user_id, player_id=uid, offset=offset) as client:
await self.get_session_button_data(user_id, client.player_id, force=True)
buttons = await self.gen_season_button(user_id, client.player_id)
if not buttons:
await message.reply_text("还没有绳网月报历史数据哦~")
return
if isinstance(self.kitsune, str):
photo = self.kitsune
else:
photo = open("resources/img/aaa.jpg", "rb")
reply_message = await message.reply_photo(
photo, "请选择要查询的绳网月报历史数据", reply_markup=InlineKeyboardMarkup(buttons)
)
if reply_message.photo:
self.kitsune = reply_message.photo[-1].file_id
async def get_ledger_history_page(self, update: "Update", user_id: int, uid: int, result: str):
"""翻页处理"""
callback_query = update.callback_query
self.log_user(update, logger.info, "切换绳网月报历史数据页 page[%s]", result)
page = int(result.split("_")[1])
async with self.helper.genshin(user_id, player_id=uid) as client:
buttons = await self.gen_season_button(user_id, client.player_id, page)
if not buttons:
await callback_query.answer("还没有绳网月报历史数据哦~", show_alert=True)
await callback_query.edit_message_text("还没有绳网月报历史数据哦~")
return
await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer(f"已切换到第 {page}", show_alert=False)
@handler.callback_query(pattern=r"^get_ledger_history\|", block=False)
async def get_ledger_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
callback_query = update.callback_query
message = callback_query.message
user = callback_query.from_user
async def get_ledger_history_callback(
callback_query_data: str,
) -> Tuple[str, int, int]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_uid = int(_data[2])
_result = _data[3]
logger.debug(
"callback_query_data函数返回 result[%s] user_id[%s] uid[%s]",
_result,
_user_id,
_uid,
)
return _result, _user_id, _uid
result, user_id, uid = await get_ledger_history_callback(callback_query.data)
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
if result == "empty_data":
await callback_query.answer(text="此按钮不可用", show_alert=True)
return
if result.startswith("p_"):
await self.get_ledger_history_page(update, user_id, uid, result)
return
data_id = int(result)
data = await self.history_data_ledger.get_by_id(data_id)
if not data:
await callback_query.answer("数据不存在,请尝试重新发送命令", show_alert=True)
await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~")
return
await callback_query.answer("正在渲染图片中 请稍等 请不要重复点击按钮")
render = await self._start_get_ledger_render(user_id, HistoryDataLedger.from_data(data).diary_data)
await render.edit_media(message)
async def ledger_use_by_inline(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"):
callback_query = update.callback_query
user = update.effective_user
user_id = user.id
uid = IInlineUseData.get_uid_from_context(context)
self.log_user(update, logger.info, "查询绳网月报")
now = datetime.now()
now_time = (now - timedelta(days=1)) if now.day == 1 and now.hour <= 4 else now
year, month = now_time.year, now_time.month
try:
async with self.helper.genshin(user_id, player_id=uid) as client:
render_result = await self._start_get_ledger(client, year, month)
except DataNotPublic:
await callback_query.answer(
"查询失败惹,可能是绳网月报功能被禁用了?请先通过米游社或者 hoyolab 获取一次绳网月报后重试。",
show_alert=True,
)
return
except SimnetBadRequest as exc:
if exc.ret_code == -120:
await callback_query.answer(
"当前角色等级不足,暂时无法获取信息",
show_alert=True,
)
return
raise exc
await render_result.edit_inline_media(callback_query)
async def get_inline_use_data(self) -> List[Optional[IInlineUseData]]:
return [
IInlineUseData(
text="当月绳网月报",
hash="ledger",
callback=self.ledger_use_by_inline,
cookie=True,
player=True,
)
]