PamGram/modules/pay_log/log.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

235 lines
8.2 KiB
Python
Raw Normal View History

2023-01-07 08:01:31 +00:00
import contextlib
from pathlib import Path
from typing import Tuple, Optional, List, Dict
import aiofiles
from genshin import Client, AuthkeyTimeout, InvalidAuthkey
from genshin.models import TransactionKind, BaseTransaction
from modules.pay_log.error import PayLogAuthkeyTimeout, PayLogInvalidAuthkey, PayLogNotFound
from modules.pay_log.models import PayLog as PayLogModel, BaseInfo
from utils.const import PROJECT_ROOT
try:
import ujson as jsonlib
except ImportError:
import json as jsonlib
PAY_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "pay_log")
PAY_LOG_PATH.mkdir(parents=True, exist_ok=True)
class PayLog:
def __init__(self, pay_log_path: Path = PAY_LOG_PATH):
self.pay_log_path = pay_log_path
@staticmethod
async def load_json(path):
async with aiofiles.open(path, "r", encoding="utf-8") as f:
return jsonlib.loads(await f.read())
@staticmethod
async def save_json(path, data: PayLogModel):
async with aiofiles.open(path, "w", encoding="utf-8") as f:
return await f.write(data.json(ensure_ascii=False, indent=4))
def get_file_path(
self,
user_id: str,
uid: str,
bak: bool = False,
):
"""获取文件路径
:param user_id: 用户 ID
:param uid: UID
:param bak: 是否为备份文件
:return: 文件路径
"""
return self.pay_log_path / f"{user_id}-{uid}.json{'.bak' if bak else ''}"
async def load_history_info(
self,
user_id: str,
uid: str,
only_status: bool = False,
) -> Tuple[Optional[PayLogModel], bool]:
"""读取历史记录数据
:param user_id: 用户id
:param uid: 原神uid
:param only_status: 是否只读取状态
:return: 抽卡记录数据
"""
file_path = self.get_file_path(user_id, uid)
if only_status:
return None, file_path.exists()
if not file_path.exists():
return PayLogModel(info=BaseInfo(uid=uid), list=[]), False
try:
return PayLogModel.parse_obj(await self.load_json(file_path)), True
2023-01-19 12:54:29 +00:00
except jsonlib.JSONDecodeError:
2023-01-07 08:01:31 +00:00
return PayLogModel(info=BaseInfo(uid=uid), list=[]), False
async def remove_history_info(
self,
user_id: str,
uid: str,
) -> bool:
"""删除历史记录数据
:param user_id: 用户id
:param uid: 原神uid
:return: 是否删除成功
"""
file_path = self.get_file_path(user_id, uid)
file_bak_path = self.get_file_path(user_id, uid, bak=True)
with contextlib.suppress(Exception):
file_bak_path.unlink(missing_ok=True)
if file_path.exists():
try:
file_path.unlink()
except PermissionError:
return False
return True
return False
async def save_pay_log_info(self, user_id: str, uid: str, info: PayLogModel) -> None:
"""保存日志记录数据
:param user_id: 用户id
:param uid: 原神uid
:param info: 记录数据
"""
save_path = self.pay_log_path / f"{user_id}-{uid}.json"
save_path_bak = self.pay_log_path / f"{user_id}-{uid}.json.bak"
# 将旧数据备份一次
with contextlib.suppress(PermissionError):
if save_path.exists():
if save_path_bak.exists():
save_path_bak.unlink()
save_path.rename(save_path.parent / f"{save_path.name}.bak")
# 写入数据
await self.save_json(save_path, info)
async def get_log_data(
self,
user_id: int,
client: Client,
authkey: str,
) -> int:
"""使用 authkey 获取历史记录数据,并合并旧数据
:param user_id: 用户id
:param client: genshin client
:param authkey: authkey
:return: 更新结果
"""
new_num = 0
pay_log, have_old = await self.load_history_info(str(user_id), str(client.uid))
history_ids = [i.id for i in pay_log.list]
try:
async for data in client.transaction_log(TransactionKind.CRYSTAL, authkey=authkey):
if data.id not in history_ids:
pay_log.list.append(data)
new_num += 1
except AuthkeyTimeout as exc:
raise PayLogAuthkeyTimeout from exc
except InvalidAuthkey as exc:
raise PayLogInvalidAuthkey from exc
if new_num > 0 or have_old:
pay_log.list.sort(key=lambda x: (x.time, x.id), reverse=True)
pay_log.info.update_now()
await self.save_pay_log_info(str(user_id), str(client.uid), pay_log)
return new_num
@staticmethod
2023-01-07 10:48:53 +00:00
async def get_month_data(pay_log: PayLogModel, price_data: List[Dict]) -> Tuple[int, List[Dict]]:
2023-01-07 08:01:31 +00:00
"""获取月份数据
:param pay_log: 日志数据
:param price_data: 商品数据
:return: 月份数据
"""
all_amount: int = 0
months: List[int] = []
month_datas: List[Dict] = []
last_month: Optional[Dict] = None
month_data: List[Optional[BaseTransaction]] = []
for i in pay_log.list:
if i.amount <= 0:
continue
all_amount += i.amount
if i.time.month not in months:
months.append(i.time.month)
if last_month:
last_month["amount"] = sum(i.amount for i in month_data)
month_data.clear()
if len(months) <= 6:
last_month = {
"month": f"{i.time.month}",
"amount": 0,
}
month_datas.append(last_month)
else:
last_month = None
for j in price_data:
if i.amount in j["price"]:
j["count"] += 1
break
month_data.append(i)
if last_month:
last_month["amount"] = sum(i.amount for i in month_data)
month_data.clear()
if not month_datas:
raise PayLogNotFound
2023-01-07 10:48:53 +00:00
return all_amount, month_datas
2023-01-07 08:01:31 +00:00
async def get_analysis(self, user_id: int, client: Client):
"""获取分析数据
:param user_id: 用户id
:param client: genshin client
:return: 分析数据
"""
pay_log, status = await self.load_history_info(str(user_id), str(client.uid))
if not status:
raise PayLogNotFound
# 单双倍结晶数
price_data = [
{
"price": price,
"count": 0,
}
for price in [[680], [300], [8080, 12960], [3880, 6560], [2240, 3960], [1090, 1960], [330, 600], [60, 120]]
]
price_data_name = ["大月卡", "小月卡", "648", "328", "198", "98", "30", "6"]
2023-01-07 10:48:53 +00:00
real_price = [68, 30, 648, 328, 198, 98, 30, 6]
all_amount, month_datas = await PayLog.get_month_data(pay_log, price_data)
2023-01-12 15:28:54 +00:00
month_data = sorted(month_datas, key=lambda k: k["amount"], reverse=True)
2023-01-07 10:48:53 +00:00
all_pay = sum((price_data[i]["count"] * real_price[i]) for i in range(len(price_data)))
2023-01-07 08:01:31 +00:00
datas = [
2023-01-07 10:48:53 +00:00
{"value": f"{all_pay:.0f}", "name": "总消费"},
2023-01-07 08:01:31 +00:00
{"value": all_amount, "name": "总结晶"},
2023-01-12 15:28:54 +00:00
{"value": f"{month_data[0]['month']}", "name": "消费最多"},
2023-01-07 08:01:31 +00:00
{
2023-01-12 15:28:54 +00:00
"value": f"{month_data[0]['amount'] / 10:.0f}",
"name": f"{month_data[0]['month']}消费",
2023-01-07 08:01:31 +00:00
},
*[
{
2023-01-12 15:28:54 +00:00
"value": price_data[i]["count"] if i != 0 else "*",
2023-01-07 08:01:31 +00:00
"name": f"{price_data_name[i]}",
}
for i in range(len(price_data))
],
]
pie_datas = [
{
2023-01-07 10:48:53 +00:00
"value": f"{price_data[i]['count'] * real_price[i]:.0f}",
2023-01-07 08:01:31 +00:00
"name": f"{price_data_name[i]}",
}
for i in range(len(price_data))
if price_data[i]["count"] > 0
]
return {
"uid": client.uid,
"datas": datas,
"bar_data": month_datas,
"pie_data": pie_datas,
}