PaiGram/plugins/ledger.py
2022-08-05 19:14:29 +08:00

177 lines
7.1 KiB
Python

import json
import os
import re
from datetime import datetime, timedelta
from genshin import GenshinException, DataNotPublic
from telegram import Update
from telegram.constants import ChatAction
from telegram.ext import CallbackContext, CommandHandler, MessageHandler, ConversationHandler, filters
from apps.cookies.services import CookiesService
from apps.template.services import TemplateService
from apps.user.repositories import UserNotFoundError
from apps.user.services import UserService
from logger import Log
from plugins.base import BasePlugins
from utils.apps.inject import inject
from utils.bot import get_all_args
from utils.decorators.error import error_callable
from utils.decorators.restricts import restricts
from utils.helpers import get_genshin_client
from utils.plugins.manager import listener_plugins_class
def check_ledger_month(context: CallbackContext) -> int:
month = datetime.now().month
args = get_all_args(context)
if len(args) >= 1:
month = args[0]
pass
elif re_data := re.findall(r"\d+", month):
month = int(re_data[0])
else:
num_dict = {"": 1, "": 2, "": 3, "": 4, "": 5,
"": 6, "": 7, "": 8, "": 9, "": 10}
month = sum(num_dict.get(i, 0) for i in month)
# check right
now_time = datetime.now()
allow_month = [datetime.now().month]
last_month = now_time.replace(day=1) - timedelta(days=1)
allow_month.append(last_month.month)
last_month = last_month.replace(day=1) - timedelta(days=1)
allow_month.append(last_month.month)
if month in allow_month:
return month
elif isinstance(month, int):
raise IndexError
return now_time.month
@listener_plugins_class()
class Ledger(BasePlugins):
"""旅行札记"""
COMMAND_RESULT, = range(10200, 10201)
@inject
def __init__(self, user_service: UserService = None, cookies_service: CookiesService = None,
template_service: TemplateService = None):
self.template_service = template_service
self.cookies_service = cookies_service
self.user_service = user_service
self.current_dir = os.getcwd()
@classmethod
def create_handlers(cls):
ledger = cls()
return [CommandHandler("ledger", ledger.command_start, block=True),
MessageHandler(filters.Regex(r"^旅行扎记(.*)"), ledger.command_start, block=True)]
async def _start_get_ledger(self, client, month=None) -> bytes:
try:
diary_info = await client.get_diary(client.uid, month=month)
except GenshinException as error:
raise error
color = ["#73a9c6", "#d56565", "#70b2b4", "#bd9a5a", "#739970", "#7a6da7", "#597ea0"]
categories = [{"id": i.id,
"name": i.name,
"color": color[i.id % len(color)],
"amount": i.amount,
"percentage": i.percentage} for i in diary_info.month_data.categories]
color = [i["color"] for i in categories]
def format_amount(amount: int) -> str:
return f"{round(amount / 10000, 2)}w" if amount >= 10000 else amount
evaluate = """const { Pie } = G2Plot;
const data = JSON.parse(`""" + json.dumps(categories) + """`);
const piePlot = new Pie("chartContainer", {
renderer: "svg",
animation: false,
data: data,
appendPadding: 10,
angleField: "amount",
colorField: "name",
radius: 1,
innerRadius: 0.7,
color: JSON.parse(`""" + json.dumps(color) + """`),
meta: {},
label: {
type: "inner",
offset: "-50%",
autoRotate: false,
style: {
textAlign: "center",
fontFamily: "tttgbnumber",
},
formatter: ({ percentage }) => {
return percentage > 2 ? `${percentage}%` : "";
},
},
statistic: {
title: {
offsetY: -18,
content: "总计",
},
content: {
offsetY: -10,
style: {
fontFamily: "tttgbnumber",
},
},
},
legend:false,
});
piePlot.render();"""
ledger_data = {
"uid": client.uid,
"day": diary_info.month,
"current_primogems": format_amount(diary_info.month_data.current_primogems),
"gacha": int(diary_info.month_data.current_primogems / 160),
"current_mora": format_amount(diary_info.month_data.current_mora),
"last_primogems": format_amount(diary_info.month_data.last_primogems),
"last_gacha": int(diary_info.month_data.last_primogems / 160),
"last_mora": format_amount(diary_info.month_data.last_mora),
"categories": categories,
}
png_data = await self.template_service.render('genshin/ledger', "ledger.html", ledger_data,
{"width": 580, "height": 610},
evaluate=evaluate,
auto_escape=False)
return png_data
@error_callable
@restricts(return_data=ConversationHandler.END)
async def command_start(self, update: Update, context: CallbackContext) -> None:
user = update.effective_user
message = update.message
try:
month = check_ledger_month(context)
except IndexError:
reply_message = await message.reply_text("仅可查询最新三月的数据,请重新输入")
if filters.ChatType.GROUPS.filter(message):
self._add_delete_message_job(context, reply_message.chat_id, reply_message.message_id, 30)
self._add_delete_message_job(context, message.chat_id, message.message_id, 30)
return
Log.info(f"用户 {user.full_name}[{user.id}] 查询原石手扎")
await update.message.reply_chat_action(ChatAction.TYPING)
try:
client = await get_genshin_client(user.id, self.user_service, self.cookies_service)
png_data = await self._start_get_ledger(client, month)
except UserNotFoundError:
reply_message = await message.reply_text("未查询到账号信息,请先私聊派蒙绑定账号")
if filters.ChatType.GROUPS.filter(message):
self._add_delete_message_job(context, reply_message.chat_id, reply_message.message_id, 30)
self._add_delete_message_job(context, message.chat_id, message.message_id, 30)
return
except DataNotPublic:
reply_message = await update.message.reply_text("查询失败惹,可能是旅行札记功能被禁用了?")
if filters.ChatType.GROUPS.filter(message):
self._add_delete_message_job(context, reply_message.chat_id, reply_message.message_id, 30)
self._add_delete_message_job(context, message.chat_id, message.message_id, 30)
return
await update.message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
await update.message.reply_photo(png_data, filename=f"{client.uid}.png", allow_sending_without_reply=True)