Support zzz ledger

This commit is contained in:
omg-xtao 2024-09-25 23:15:46 +08:00 committed by GitHub
parent 38195c6291
commit 7568f5ad69
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 743 additions and 21 deletions

View File

@ -1,7 +1,7 @@
import enum
from pydantic import BaseModel
from simnet.models.starrail.diary import StarRailDiary
from simnet.models.zzz.diary import ZZZDiary
from simnet.models.zzz.chronicle.challenge import ZZZChallenge
from gram_core.services.history_data.models import HistoryData
@ -30,7 +30,7 @@ class HistoryDataAbyss(BaseModel):
class HistoryDataLedger(BaseModel):
diary_data: StarRailDiary
diary_data: ZZZDiary
@classmethod
def from_data(cls, data: HistoryData) -> "HistoryDataLedger":

View File

@ -2,7 +2,7 @@ import datetime
from typing import List
from pytz import timezone
from simnet.models.starrail.diary import StarRailDiary
from simnet.models.zzz.diary import ZZZDiary
from simnet.models.zzz.chronicle.challenge import ZZZChallenge
from core.services.history_data.models import (
@ -60,7 +60,7 @@ class HistoryDataLedgerServices(BaseService, HistoryDataBaseServices):
DATA_TYPE = HistoryDataTypeEnum.LEDGER.value
@staticmethod
def create(user_id: int, diary_data: StarRailDiary):
def create(user_id: int, diary_data: ZZZDiary):
data = HistoryDataLedger(diary_data=diary_data)
json_data = data.json(by_alias=True, encoder=json_encoder)
return HistoryData(

View File

@ -22,46 +22,51 @@ class SetCommandPlugin(Plugin):
BotCommand("cancel", "取消操作(解决一切玄学问题)"),
BotCommand("help_raw", "查看文本帮助"),
# gacha_log 相关
BotCommand("signal_log", "查看调频记录"),
BotCommand("signal_log_import", "导入调频记录"),
BotCommand("signal_log_export", "导出调频记录"),
BotCommand("signal_log_delete", "删除调频记录"),
BotCommand("signal_log_online_view", "调频记录在线浏览"),
BotCommand("signal_log_rank", "抽卡排行榜"),
BotCommand("avatars", "查询角色练度"),
BotCommand("player_card", "角色卡片"),
BotCommand("agent_detail", "角色详细信息"),
# Cookie 查询类
BotCommand("sign", "米游社绝区零每日签到"),
BotCommand("dailynote_tasks", "自动便笺提醒"),
BotCommand("challenge", "防卫战信息查询"),
BotCommand("challenge_history", "防卫战历史信息查询"),
# 其他
BotCommand("action_log_import", "导入登录记录"),
BotCommand("setuid", "添加/重设UID"),
BotCommand("setcookie", "添加/重设Cookie"),
BotCommand("player", "管理用户绑定玩家"),
BotCommand("verify", "手动验证"),
BotCommand("daily_note_tasks", "自动便笺提醒"),
BotCommand("cookies_import", "从其他 BOT 导入账号信息"),
BotCommand("cookies_export", "导出账号信息给其他 BOT"),
BotCommand("privacy", "隐私政策"),
]
group_command = [
# 通用
BotCommand("help", "帮助"),
BotCommand("signal_log", "查看调频记录"),
BotCommand("signal_log_online_view", "调频记录在线浏览"),
BotCommand("signal_log_rank", "抽卡排行榜"),
BotCommand("action_log", "查询登录记录"),
BotCommand("dailynote", "查询实时便笺"),
BotCommand("redeem", "(国际服)兑换 Key"),
BotCommand("ledger", "查询当月绳网月报"),
BotCommand("ledger_history", "查询绳网月报历史记录"),
BotCommand("avatars", "查询角色练度"),
BotCommand("player_card", "角色卡片"),
BotCommand("agent_detail", "角色详细信息"),
BotCommand("sign", "米游社绝区零每日签到"),
# Wiki 类
BotCommand("weapon", "查看音擎图鉴"),
BotCommand("avatar", "查询角色攻略"),
BotCommand("challenge", "防卫战信息查询"),
BotCommand("challenge_history", "防卫战历史信息查询"),
# UID 查询类
BotCommand("stats", "玩家统计查询"),
# Cookie 查询类
BotCommand("dailynote", "查询实时便笺"),
BotCommand("cookies_import", "从其他 BOT 导入账号信息"),
BotCommand("cookies_export", "导出账号信息给其他 BOT"),
]
admin_command = [
BotCommand("add_admin", "添加管理员"),
BotCommand("del_admin", "删除管理员"),
BotCommand("refresh_wiki", "刷新Wiki缓存"),
BotCommand("save_entry", "保存条目数据"),
BotCommand("remove_all_entry", "删除全部条目数据"),
BotCommand("sign_all", "全部账号重新签到"),
BotCommand("refresh_all_history", "全部账号刷新历史记录"),
BotCommand("action_log_import_all", "全部账号导入登录记录"),
BotCommand("send_log", "发送日志"),
BotCommand("update", "更新"),
BotCommand("set_command", "重设命令"),

View File

@ -1,6 +1,6 @@
import datetime
from asyncio import sleep
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Dict
from simnet.errors import (
TimedOut as SimnetTimedOut,
@ -21,6 +21,7 @@ from gram_core.services.cookies import CookiesService
from gram_core.services.cookies.models import CookiesStatusEnum
from plugins.zzz.challenge import ChallengePlugin
from plugins.tools.genshin import GenshinHelper, PlayerNotFoundError, CookiesNotFoundError
from plugins.zzz.ledger import LedgerPlugin
from utils.log import logger
if TYPE_CHECKING:
@ -72,6 +73,36 @@ class RefreshHistoryJob(Plugin):
notice_text = NOTICE_TEXT % ("防卫战历史记录", now, uid, "挑战记录")
await self.send_notice(context, user_id, notice_text)
async def _save_ledger_data(self, client: "ZZZClient", year: int, month: int) -> bool:
req_month = f"{year}0{month}" if month < 10 else f"{year}{month}"
diary_info = await client.get_zzz_diary(client.player_id, month=req_month)
return await LedgerPlugin.save_ledger_data(self.history_data_ledger, client.player_id, diary_info)
@staticmethod
def get_ledger_months() -> Dict[int, int]:
now = datetime.datetime.now()
now_time = (now - datetime.timedelta(days=1)) if now.day == 1 and now.hour <= 4 else now
months = {}
last_month = now_time.replace(day=1) - datetime.timedelta(days=1)
months[last_month.month] = last_month.year
last_month = last_month.replace(day=1) - datetime.timedelta(days=1)
months[last_month.month] = last_month.year
return months
async def save_ledger_data(self, client: "ZZZClient") -> bool:
months = self.get_ledger_months()
ok = False
for month, year in months.items():
if await self._save_ledger_data(client, year, month):
ok = True
return ok
async def send_ledger_notice(self, context: "ContextTypes.DEFAULT_TYPE", user_id: int, uid: int):
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
notice_text = NOTICE_TEXT % ("绳网月报历史记录", now, uid, "绳网月报历史记录")
await self.send_notice(context, user_id, notice_text)
@handler.command(command="remove_same_history", block=False, admin=True)
async def remove_same_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE"):
user = update.effective_user
@ -81,6 +112,8 @@ class RefreshHistoryJob(Plugin):
text = "移除相同数据历史记录任务完成\n"
num1 = await self.history_data_abyss.remove_same_data()
text += f"防卫战数据移除数量:{num1}\n"
num2 = await self.history_data_ledger.remove_same_data()
text += f"开拓月历数据移除数量:{num2}\n"
await reply.edit_text(text)
@handler.command(command="refresh_all_history", block=False, admin=True)
@ -104,6 +137,8 @@ class RefreshHistoryJob(Plugin):
async with self.genshin_helper.genshin(user_id) as client:
if await self.save_abyss_data(client):
await self.send_abyss_notice(context, user_id, client.player_id)
if await self.save_ledger_data(client):
await self.send_ledger_notice(context, user_id, client.player_id)
except (InvalidCookies, PlayerNotFoundError, CookiesNotFoundError):
continue
except SimnetBadRequest as exc:

359
plugins/zzz/ledger.py Normal file
View File

@ -0,0 +1,359 @@
import math
import os
import re
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, List, Tuple, Optional
from simnet.errors import BadRequest as SimnetBadRequest, DataNotPublic
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction
from telegram.ext import filters, CallbackContext, ContextTypes
from core.plugin import Plugin, handler
from core.services.cookies import CookiesService
from core.services.history_data.models import HistoryDataLedger
from core.services.history_data.services import HistoryDataLedgerServices
from core.services.template.models import RenderResult
from core.services.template.services import TemplateService
from gram_core.config import config
from gram_core.dependence.redisdb import RedisDB
from gram_core.plugin.methods.inline_use_data import IInlineUseData
from plugins.tools.genshin import GenshinHelper
from utils.enkanetwork import RedisCache
from utils.log import logger
from utils.uid import mask_number
if TYPE_CHECKING:
from simnet import ZZZClient
from simnet.models.zzz.diary import ZZZDiary
__all__ = ("LedgerPlugin",)
class LedgerPlugin(Plugin):
"""绳网月报查询"""
def __init__(
self,
helper: GenshinHelper,
cookies_service: CookiesService,
template_service: TemplateService,
history_data_ledger: HistoryDataLedgerServices,
redis: RedisDB,
):
self.template_service = template_service
self.cookies_service = cookies_service
self.current_dir = os.getcwd()
self.helper = helper
self.history_data_ledger = history_data_ledger
self.cache = RedisCache(redis.client, key="plugin:ledger:history")
self.kitsune = None
async def _start_get_ledger(self, client: "ZZZClient", year, month) -> RenderResult:
req_month = f"{year}0{month}" if month < 10 else f"{year}{month}"
diary_info = await client.get_zzz_diary(client.player_id, month=req_month)
await self.save_ledger_data(self.history_data_ledger, client.player_id, diary_info)
return await self._start_get_ledger_render(client.player_id, diary_info)
async def _start_get_ledger_render(self, uid: int, diary_info: "ZZZDiary") -> RenderResult:
color = ["#73a9c6", "#d56565", "#70b2b4", "#bd9a5a", "#739970", "#7a6da7", "#597ea0"]
categories = [
{
"name": i.name,
"color": color[idx % len(color)],
"amount": i.amount,
"percentage": i.percentage,
}
for idx, i in enumerate(diary_info.month_data.categories)
]
color = [i["color"] for i in categories]
def format_amount(amount: int) -> str:
return f"{round(amount / 10000, 2)}w" if amount >= 10000 else amount
_amount_map = {i.id: i.amount for i in diary_info.month_data.list}
current_hcoin = _amount_map.get("PolychromesData", 0)
current_rails_pass = _amount_map.get("MatserTapeData", 0)
current_boo_pass = _amount_map.get("BooponsData", 0)
ledger_data = {
"uid": mask_number(uid),
"day": diary_info.month,
"current_hcoin": format_amount(current_hcoin),
"gacha": int(current_hcoin / 160),
"current_rails_pass": format_amount(current_rails_pass),
"current_boo_pass": format_amount(current_boo_pass),
"categories": categories,
"color": color,
"nickname": diary_info.role_info.nickname,
"avatar": diary_info.role_info.avatar,
}
render_result = await self.template_service.render(
"zzz/ledger/ledger.html", ledger_data, {"width": 640, "height": 610}
)
return render_result
@handler.command(command="ledger", cookie=True, block=False)
@handler.message(filters=filters.Regex("^绳网月报查询(.*)"), block=False)
async def command_start(self, update: Update, context: CallbackContext) -> None:
user_id = await self.get_real_user_id(update)
message = update.effective_message
uid, offset = self.get_real_uid_or_offset(update)
now = datetime.now()
now_time = (now - timedelta(days=1)) if now.day == 1 and now.hour <= 4 else now
month = now_time.month
try:
args = self.get_args(context)
if len(args) >= 1:
month = args[0].replace("", "")
if re_data := re.findall(r"\d+", str(month)):
month = int(re_data[0])
else:
num_dict = {"": 1, "": 2, "": 3, "": 4, "": 5, "": 6, "": 7, "": 8, "": 9, "": 10}
month = sum(num_dict.get(i, 0) for i in str(month))
# check right
allow_month_year = {now_time.month: now_time.year}
last_month = now_time.replace(day=1) - timedelta(days=1)
allow_month_year[last_month.month] = last_month.year
last_month = last_month.replace(day=1) - timedelta(days=1)
allow_month_year[last_month.month] = last_month.year
if (month not in allow_month_year) or (not isinstance(month, int)):
raise IndexError
year = allow_month_year[month]
except IndexError:
reply_message = await message.reply_text("仅可查询最新三月的数据,请重新输入")
if filters.ChatType.GROUPS.filter(message):
self.add_delete_message_job(reply_message, delay=30)
self.add_delete_message_job(message, delay=30)
return
self.log_user(update, logger.info, "查询绳网月报")
await message.reply_chat_action(ChatAction.TYPING)
try:
async with self.helper.genshin(user_id, player_id=uid, offset=offset) as client:
render_result = await self._start_get_ledger(client, year, month)
except DataNotPublic:
reply_message = await message.reply_text(
"查询失败惹,可能是绳网月报功能被禁用了?请先通过米游社或者 hoyolab 获取一次绳网月报后重试。"
)
if filters.ChatType.GROUPS.filter(message):
self.add_delete_message_job(reply_message, delay=30)
self.add_delete_message_job(message, delay=30)
return
except SimnetBadRequest as exc:
if exc.retcode == -120:
await message.reply_text("当前角色等级不足,暂时无法获取信息")
return
raise exc
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
await render_result.reply_photo(message, filename=f"{client.player_id}.png")
@staticmethod
async def save_ledger_data(
history_data_ledger: "HistoryDataLedgerServices", uid: int, ledger_data: "ZZZDiary"
) -> bool:
if int(ledger_data.current_month) == ledger_data.month:
return False
model = history_data_ledger.create(uid, ledger_data)
old_data = await history_data_ledger.get_by_user_id_data_id(uid, model.data_id)
if not old_data:
await history_data_ledger.add(model)
return True
return False
async def get_ledger_data(self, uid: int):
return await self.history_data_ledger.get_by_user_id(uid)
@staticmethod
def get_season_data_name(data: "HistoryDataLedger") -> str:
return f"{data.diary_data.data_id}"
async def get_session_button_data(self, user_id: int, uid: int, force: bool = False):
redis = await self.cache.get(str(uid))
if redis and not force:
return redis["buttons"]
data = await self.get_ledger_data(uid)
data.sort(key=lambda x: x.data_id, reverse=True)
abyss_data = [HistoryDataLedger.from_data(i) for i in data]
buttons = [
{
"name": LedgerPlugin.get_season_data_name(abyss_data[idx]),
"value": f"get_ledger_history|{user_id}|{uid}|{value.id}",
}
for idx, value in enumerate(data)
]
await self.cache.set(str(uid), {"buttons": buttons})
return buttons
async def gen_season_button(
self,
user_id: int,
uid: int,
page: int = 1,
) -> List[List[InlineKeyboardButton]]:
"""生成按钮"""
data = await self.get_session_button_data(user_id, uid)
if not data:
return []
buttons = [
InlineKeyboardButton(
value["name"],
callback_data=value["value"],
)
for value in data
]
all_buttons = [buttons[i : i + 3] for i in range(0, len(buttons), 3)]
send_buttons = all_buttons[(page - 1) * 5 : page * 5]
last_page = page - 1 if page > 1 else 0
all_page = math.ceil(len(all_buttons) / 5)
next_page = page + 1 if page < all_page and all_page > 1 else 0
last_button = []
if last_page:
last_button.append(
InlineKeyboardButton(
"<< 上一页",
callback_data=f"get_ledger_history|{user_id}|{uid}|p_{last_page}",
)
)
if last_page or next_page:
last_button.append(
InlineKeyboardButton(
f"{page}/{all_page}",
callback_data=f"get_ledger_history|{user_id}|{uid}|empty_data",
)
)
if next_page:
last_button.append(
InlineKeyboardButton(
"下一页 >>",
callback_data=f"get_ledger_history|{user_id}|{uid}|p_{next_page}",
)
)
if last_button:
send_buttons.append(last_button)
return send_buttons
@handler.command("ledger_history", cookie=True, block=False)
@handler.message(filters.Regex(r"^绳网月报历史数据"), block=False)
async def ledger_history_command_start(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
user_id = await self.get_real_user_id(update)
message = update.effective_message
uid, offset = self.get_real_uid_or_offset(update)
self.log_user(update, logger.info, "查询绳网月报历史数据")
async with self.helper.genshin(user_id, player_id=uid, offset=offset) as client:
await self.get_session_button_data(user_id, client.player_id, force=True)
buttons = await self.gen_season_button(user_id, client.player_id)
if not buttons:
await message.reply_text("还没有绳网月报历史数据哦~")
return
if isinstance(self.kitsune, str):
photo = self.kitsune
else:
photo = open("resources/img/aaa.jpg", "rb")
reply_message = await message.reply_photo(
photo, "请选择要查询的绳网月报历史数据", reply_markup=InlineKeyboardMarkup(buttons)
)
if reply_message.photo:
self.kitsune = reply_message.photo[-1].file_id
async def get_ledger_history_page(self, update: "Update", user_id: int, uid: int, result: str):
"""翻页处理"""
callback_query = update.callback_query
self.log_user(update, logger.info, "切换绳网月报历史数据页 page[%s]", result)
page = int(result.split("_")[1])
async with self.helper.genshin(user_id, player_id=uid) as client:
buttons = await self.gen_season_button(user_id, client.player_id, page)
if not buttons:
await callback_query.answer("还没有绳网月报历史数据哦~", show_alert=True)
await callback_query.edit_message_text("还没有绳网月报历史数据哦~")
return
await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer(f"已切换到第 {page}", show_alert=False)
@handler.callback_query(pattern=r"^get_ledger_history\|", block=False)
async def get_ledger_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
callback_query = update.callback_query
message = callback_query.message
user = callback_query.from_user
async def get_ledger_history_callback(
callback_query_data: str,
) -> Tuple[str, int, int]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_uid = int(_data[2])
_result = _data[3]
logger.debug(
"callback_query_data函数返回 result[%s] user_id[%s] uid[%s]",
_result,
_user_id,
_uid,
)
return _result, _user_id, _uid
result, user_id, uid = await get_ledger_history_callback(callback_query.data)
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
if result == "empty_data":
await callback_query.answer(text="此按钮不可用", show_alert=True)
return
if result.startswith("p_"):
await self.get_ledger_history_page(update, user_id, uid, result)
return
data_id = int(result)
data = await self.history_data_ledger.get_by_id(data_id)
if not data:
await callback_query.answer("数据不存在,请尝试重新发送命令", show_alert=True)
await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~")
return
await callback_query.answer("正在渲染图片中 请稍等 请不要重复点击按钮")
render = await self._start_get_ledger_render(user_id, HistoryDataLedger.from_data(data).diary_data)
await render.edit_media(message)
async def ledger_use_by_inline(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"):
callback_query = update.callback_query
user = update.effective_user
user_id = user.id
uid = IInlineUseData.get_uid_from_context(context)
self.log_user(update, logger.info, "查询绳网月报")
now = datetime.now()
now_time = (now - timedelta(days=1)) if now.day == 1 and now.hour <= 4 else now
year, month = now_time.year, now_time.month
try:
async with self.helper.genshin(user_id, player_id=uid) as client:
render_result = await self._start_get_ledger(client, year, month)
except DataNotPublic:
await callback_query.answer(
"查询失败惹,可能是绳网月报功能被禁用了?请先通过米游社或者 hoyolab 获取一次绳网月报后重试。",
show_alert=True,
)
return
except SimnetBadRequest as exc:
if exc.ret_code == -120:
await callback_query.answer(
"当前角色等级不足,暂时无法获取信息",
show_alert=True,
)
return
raise exc
await render_result.edit_inline_media(callback_query)
async def get_inline_use_data(self) -> List[Optional[IInlineUseData]]:
return [
IInlineUseData(
text="当月绳网月报",
hash="ledger",
callback=self.ledger_use_by_inline,
cookie=True,
player=True,
)
]

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 144 KiB

46
resources/zzz/ledger/g2plot.min.js vendored Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,107 @@
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="content-type" content="text/html;charset=utf-8" />
<link rel="shortcut icon" href="#" />
<link rel="stylesheet" type="text/css" href="ledger.css" />
<link rel="preload" href="../../fonts/tttgbnumber.ttf" as="font">
<link rel="preload" href="background/loading-bg-m.png" as="image">
<link rel="preload" href="background/chart.png" as="image">
</head>
<body>
<div class="container" id="container">
<div class="title-box">
<div>
<div class="avatar-container">
<img class="Avatar" src="{{ avatar }}" alt="Avatar" />
<div class="nickname">{{ nickname }}</div>
</div>
<div class="info">
<div class="uid">ID{{uid}}</div>
<div class="month">{{day}} 月绳网月报</div>
</div>
</div>
</div>
<div class="data-box">
<div class="month">
<div class="head">收入一览:</div>
<div class="primogems">
<div class="icon icon-xq"></div>
<div class="text">菲林:{{ current_hcoin }} | {{ gacha }} 抽</div>
</div>
<div class="primogems">
<div class="icon icon-ticket"></div>
<div class="text">原装/加密母带:{{ current_rails_pass }}</div>
</div>
<div class="primogems">
<div class="icon icon-boo"></div>
<div class="text">邦布劵:{{ current_boo_pass }}</div>
</div>
</div>
</div>
<div class="chart-box">
<div class="head">菲林收入组成:</div>
<div class="chart-info">
<div id="chartContainer"></div>
<ul class="tooltip">
{% for category in categories %}
<li>
<i style="background: {{ category.color }}"></i>
<span class="action">{{ category.name }}</span> <em>{{ category.percentage }}%</em><span class="num">{{ category.amount }}</span>
</li>
{% endfor %}
</ul>
</div>
</div>
</div>
<script type="text/javascript" src="g2plot.min.js"></script>
<script>
const { Pie } = G2Plot;
const data = {{ categories | tojson }};
const color = {{ color | tojson }};
const piePlot = new Pie("chartContainer", {
renderer: "svg",
animation: false,
data,
appendPadding: 10,
angleField: "amount",
colorField: "name",
radius: 1,
innerRadius: 0.7,
color,
meta: {},
label: {
type: "inner",
offset: "-50%",
autoRotate: false,
style: {
textAlign: "center",
fontFamily: "tttgbnumber",
},
formatter: ({ percentage }) => {
return percentage > 2 ? `${percentage}%` : "";
},
},
statistic: {
title: {
offsetY: -18,
content: "总计",
style: {
color: "white",
}
},
content: {
offsetY: -10,
style: {
fontFamily: "tttgbnumber",
color: "white",
},
},
},
legend:false,
});
piePlot.render();
</script>
</body>
</html>