MibooGram/plugins/genshin/ledger.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

131 lines
5.4 KiB
Python
Raw Normal View History

import os
import re
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from simnet.errors import DataNotPublic, BadRequest as SimnetBadRequest
from telegram.constants import ChatAction
from telegram.ext import filters
from core.plugin import Plugin, handler
from core.services.cookies import CookiesService
from core.services.template.models import RenderResult
from core.services.template.services import TemplateService
from plugins.tools.genshin import GenshinHelper
from utils.log import logger
from utils.uid import mask_number
if TYPE_CHECKING:
from telegram import Update
from telegram.ext import ContextTypes
from simnet import GenshinClient
__all__ = ("LedgerPlugin",)
class LedgerPlugin(Plugin):
"""旅行札记查询"""
def __init__(
self,
helper: GenshinHelper,
cookies_service: CookiesService,
template_service: TemplateService,
):
self.template_service = template_service
self.cookies_service = cookies_service
self.current_dir = os.getcwd()
self.helper = helper
async def _start_get_ledger(self, client: "GenshinClient", month=None) -> RenderResult:
diary_info = await client.get_genshin_diary(client.player_id, month=month)
color = ["#73a9c6", "#d56565", "#70b2b4", "#bd9a5a", "#739970", "#7a6da7", "#597ea0"]
categories = [
{
"id": i.id,
"name": i.name,
"color": color[i.id % len(color)],
"amount": i.amount,
"percentage": i.percentage,
}
for i in diary_info.month_data.categories
]
color = [i["color"] for i in categories]
def format_amount(amount: int) -> str:
return f"{round(amount / 10000, 2)}w" if amount >= 10000 else amount
ledger_data = {
"uid": mask_number(client.player_id),
"day": diary_info.month,
"current_primogems": format_amount(diary_info.month_data.current_primogems),
"gacha": int(diary_info.month_data.current_primogems / 160),
"current_mora": format_amount(diary_info.month_data.current_mora),
"last_primogems": format_amount(diary_info.month_data.last_primogems),
"last_gacha": int(diary_info.month_data.last_primogems / 160),
"last_mora": format_amount(diary_info.month_data.last_mora),
"categories": categories,
"color": color,
}
render_result = await self.template_service.render(
"genshin/ledger/ledger.jinja2", ledger_data, {"width": 580, "height": 610}
)
return render_result
@handler.command(command="ledger", block=False)
@handler.message(filters=filters.Regex("^旅行札记查询(.*)"), block=False)
async def command_start(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None:
2024-03-10 12:40:26 +00:00
user_id = await self.get_real_user_id(update)
message = update.effective_message
now = datetime.now()
now_time = (now - timedelta(days=1)) if now.day == 1 and now.hour <= 4 else now
month = now_time.month
try:
args = self.get_args(context)
if len(args) >= 1:
month = args[0].replace("", "")
if re_data := re.findall(r"\d+", str(month)):
month = int(re_data[0])
else:
num_dict = {"": 1, "": 2, "": 3, "": 4, "": 5, "": 6, "": 7, "": 8, "": 9, "": 10}
month = sum(num_dict.get(i, 0) for i in str(month))
# check right
allow_month = [now_time.month]
last_month = now_time.replace(day=1) - timedelta(days=1)
allow_month.append(last_month.month)
last_month = last_month.replace(day=1) - timedelta(days=1)
allow_month.append(last_month.month)
if month not in allow_month and isinstance(month, int):
raise IndexError
month = now_time.month
except IndexError:
reply_message = await message.reply_text("仅可查询最新三月的数据,请重新输入")
if filters.ChatType.GROUPS.filter(message):
self.add_delete_message_job(reply_message, delay=30)
self.add_delete_message_job(message, delay=30)
return
2024-03-10 12:40:26 +00:00
self.log_user(update, logger.info, "查询旅行札记")
await message.reply_chat_action(ChatAction.TYPING)
try:
2024-03-10 12:40:26 +00:00
async with self.helper.genshin(user_id) as client:
render_result = await self._start_get_ledger(client, month)
except DataNotPublic:
reply_message = await message.reply_text(
"查询失败惹,可能是旅行札记功能被禁用了?请先通过米游社或者 hoyolab 获取一次旅行札记后重试。"
)
if filters.ChatType.GROUPS.filter(message):
self.add_delete_message_job(reply_message, delay=30)
self.add_delete_message_job(message, delay=30)
return
except SimnetBadRequest as exc:
if exc.ret_code == -120:
await message.reply_text("当前角色冒险等阶不足,暂时无法获取信息")
return
raise exc
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
await render_result.reply_photo(message, filename=f"{client.player_id}.png", allow_sending_without_reply=True)