mirror of
https://github.com/PaiGramTeam/PamGram.git
synced 2024-11-16 12:02:16 +00:00
235 lines
8.2 KiB
Python
235 lines
8.2 KiB
Python
import contextlib
|
|
from pathlib import Path
|
|
from typing import Tuple, Optional, List, Dict
|
|
|
|
import aiofiles
|
|
from genshin import Client, AuthkeyTimeout, InvalidAuthkey
|
|
from genshin.models import TransactionKind, BaseTransaction
|
|
|
|
from modules.pay_log.error import PayLogAuthkeyTimeout, PayLogInvalidAuthkey, PayLogNotFound
|
|
from modules.pay_log.models import PayLog as PayLogModel, BaseInfo
|
|
from utils.const import PROJECT_ROOT
|
|
|
|
try:
|
|
import ujson as jsonlib
|
|
|
|
except ImportError:
|
|
import json as jsonlib
|
|
|
|
PAY_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "pay_log")
|
|
PAY_LOG_PATH.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
class PayLog:
|
|
def __init__(self, pay_log_path: Path = PAY_LOG_PATH):
|
|
self.pay_log_path = pay_log_path
|
|
|
|
@staticmethod
|
|
async def load_json(path):
|
|
async with aiofiles.open(path, "r", encoding="utf-8") as f:
|
|
return jsonlib.loads(await f.read())
|
|
|
|
@staticmethod
|
|
async def save_json(path, data: PayLogModel):
|
|
async with aiofiles.open(path, "w", encoding="utf-8") as f:
|
|
return await f.write(data.json(ensure_ascii=False, indent=4))
|
|
|
|
def get_file_path(
|
|
self,
|
|
user_id: str,
|
|
uid: str,
|
|
bak: bool = False,
|
|
) -> Path:
|
|
"""获取文件路径
|
|
:param user_id: 用户 ID
|
|
:param uid: UID
|
|
:param bak: 是否为备份文件
|
|
:return: 文件路径
|
|
"""
|
|
return self.pay_log_path / f"{user_id}-{uid}.json{'.bak' if bak else ''}"
|
|
|
|
async def load_history_info(
|
|
self,
|
|
user_id: str,
|
|
uid: str,
|
|
only_status: bool = False,
|
|
) -> Tuple[Optional[PayLogModel], bool]:
|
|
"""读取历史记录数据
|
|
:param user_id: 用户id
|
|
:param uid: 原神uid
|
|
:param only_status: 是否只读取状态
|
|
:return: 抽卡记录数据
|
|
"""
|
|
file_path = self.get_file_path(user_id, uid)
|
|
if only_status:
|
|
return None, file_path.exists()
|
|
if not file_path.exists():
|
|
return PayLogModel(info=BaseInfo(uid=uid), list=[]), False
|
|
try:
|
|
return PayLogModel.parse_obj(await self.load_json(file_path)), True
|
|
except jsonlib.JSONDecodeError:
|
|
return PayLogModel(info=BaseInfo(uid=uid), list=[]), False
|
|
|
|
async def remove_history_info(
|
|
self,
|
|
user_id: str,
|
|
uid: str,
|
|
) -> bool:
|
|
"""删除历史记录数据
|
|
:param user_id: 用户id
|
|
:param uid: 原神uid
|
|
:return: 是否删除成功
|
|
"""
|
|
file_path = self.get_file_path(user_id, uid)
|
|
file_bak_path = self.get_file_path(user_id, uid, bak=True)
|
|
with contextlib.suppress(Exception):
|
|
file_bak_path.unlink(missing_ok=True)
|
|
if file_path.exists():
|
|
try:
|
|
file_path.unlink()
|
|
except PermissionError:
|
|
return False
|
|
return True
|
|
return False
|
|
|
|
async def save_pay_log_info(self, user_id: str, uid: str, info: PayLogModel) -> None:
|
|
"""保存日志记录数据
|
|
:param user_id: 用户id
|
|
:param uid: 原神uid
|
|
:param info: 记录数据
|
|
"""
|
|
save_path = self.pay_log_path / f"{user_id}-{uid}.json"
|
|
save_path_bak = self.pay_log_path / f"{user_id}-{uid}.json.bak"
|
|
# 将旧数据备份一次
|
|
with contextlib.suppress(PermissionError):
|
|
if save_path.exists():
|
|
if save_path_bak.exists():
|
|
save_path_bak.unlink()
|
|
save_path.rename(save_path.parent / f"{save_path.name}.bak")
|
|
# 写入数据
|
|
await self.save_json(save_path, info)
|
|
|
|
async def get_log_data(
|
|
self,
|
|
user_id: int,
|
|
client: Client,
|
|
authkey: str,
|
|
) -> int:
|
|
"""使用 authkey 获取历史记录数据,并合并旧数据
|
|
:param user_id: 用户id
|
|
:param client: genshin client
|
|
:param authkey: authkey
|
|
:return: 更新结果
|
|
"""
|
|
new_num = 0
|
|
pay_log, have_old = await self.load_history_info(str(user_id), str(client.uid))
|
|
history_ids = [i.id for i in pay_log.list]
|
|
try:
|
|
async for data in client.transaction_log(TransactionKind.CRYSTAL, authkey=authkey):
|
|
if data.id not in history_ids:
|
|
pay_log.list.append(data)
|
|
new_num += 1
|
|
except AuthkeyTimeout as exc:
|
|
raise PayLogAuthkeyTimeout from exc
|
|
except InvalidAuthkey as exc:
|
|
raise PayLogInvalidAuthkey from exc
|
|
if new_num > 0 or have_old:
|
|
pay_log.list.sort(key=lambda x: (x.time, x.id), reverse=True)
|
|
pay_log.info.update_now()
|
|
await self.save_pay_log_info(str(user_id), str(client.uid), pay_log)
|
|
return new_num
|
|
|
|
@staticmethod
|
|
async def get_month_data(pay_log: PayLogModel, price_data: List[Dict]) -> Tuple[int, List[Dict]]:
|
|
"""获取月份数据
|
|
:param pay_log: 日志数据
|
|
:param price_data: 商品数据
|
|
:return: 月份数据
|
|
"""
|
|
all_amount: int = 0
|
|
months: List[int] = []
|
|
month_datas: List[Dict] = []
|
|
last_month: Optional[Dict] = None
|
|
month_data: List[Optional[BaseTransaction]] = []
|
|
for i in pay_log.list:
|
|
if i.amount <= 0:
|
|
continue
|
|
all_amount += i.amount
|
|
if i.time.month not in months:
|
|
months.append(i.time.month)
|
|
if last_month:
|
|
last_month["amount"] = sum(i.amount for i in month_data)
|
|
month_data.clear()
|
|
if len(months) <= 6:
|
|
last_month = {
|
|
"month": f"{i.time.month}月",
|
|
"amount": 0,
|
|
}
|
|
month_datas.append(last_month)
|
|
else:
|
|
last_month = None
|
|
for j in price_data:
|
|
if i.amount in j["price"]:
|
|
j["count"] += 1
|
|
break
|
|
month_data.append(i)
|
|
if last_month:
|
|
last_month["amount"] = sum(i.amount for i in month_data)
|
|
month_data.clear()
|
|
if not month_datas:
|
|
raise PayLogNotFound
|
|
return all_amount, month_datas
|
|
|
|
async def get_analysis(self, user_id: int, client: Client):
|
|
"""获取分析数据
|
|
:param user_id: 用户id
|
|
:param client: genshin client
|
|
:return: 分析数据
|
|
"""
|
|
pay_log, status = await self.load_history_info(str(user_id), str(client.uid))
|
|
if not status:
|
|
raise PayLogNotFound
|
|
# 单双倍结晶数
|
|
price_data = [
|
|
{
|
|
"price": price,
|
|
"count": 0,
|
|
}
|
|
for price in [[680], [300], [8080, 12960], [3880, 6560], [2240, 3960], [1090, 1960], [330, 600], [60, 120]]
|
|
]
|
|
price_data_name = ["大月卡", "小月卡", "648", "328", "198", "98", "30", "6"]
|
|
real_price = [68, 30, 648, 328, 198, 98, 30, 6]
|
|
all_amount, month_datas = await PayLog.get_month_data(pay_log, price_data)
|
|
month_data = sorted(month_datas, key=lambda k: k["amount"], reverse=True)
|
|
all_pay = sum((price_data[i]["count"] * real_price[i]) for i in range(len(price_data)))
|
|
datas = [
|
|
{"value": f"¥{all_pay:.0f}", "name": "总消费"},
|
|
{"value": all_amount, "name": "总结晶"},
|
|
{"value": f"{month_data[0]['month']}", "name": "消费最多"},
|
|
{
|
|
"value": f"¥{month_data[0]['amount'] / 10:.0f}",
|
|
"name": f"{month_data[0]['month']}消费",
|
|
},
|
|
*[
|
|
{
|
|
"value": price_data[i]["count"] if i != 0 else "*",
|
|
"name": f"{price_data_name[i]}",
|
|
}
|
|
for i in range(len(price_data))
|
|
],
|
|
]
|
|
pie_datas = [
|
|
{
|
|
"value": f"{price_data[i]['count'] * real_price[i]:.0f}",
|
|
"name": f"{price_data_name[i]}",
|
|
}
|
|
for i in range(len(price_data))
|
|
if price_data[i]["count"] > 0
|
|
]
|
|
return {
|
|
"uid": client.uid,
|
|
"datas": datas,
|
|
"bar_data": month_datas,
|
|
"pie_data": pie_datas,
|
|
}
|