PamGram/modules/gacha_log/log.py
2024-09-12 21:12:23 +08:00

679 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import contextlib
import datetime
import json
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Dict, List, Optional, Tuple, TYPE_CHECKING
import aiofiles
from simnet import StarRailClient, Region
from simnet.errors import AuthkeyTimeout, InvalidAuthkey
from simnet.models.starrail.wish import StarRailBannerType
from simnet.utils.player import recognize_starrail_server
from gram_core.services.gacha_log_rank.services import GachaLogRankService
from metadata.pool.pool import get_pool_by_id
from modules.gacha_log.const import GACHA_TYPE_LIST
from modules.gacha_log.error import (
GachaLogAccountNotFound,
GachaLogAuthkeyTimeout,
GachaLogException,
GachaLogFileError,
GachaLogInvalidAuthkey,
GachaLogMixedProvider,
GachaLogNotFound,
)
from modules.gacha_log.models import (
FiveStarItem,
FourStarItem,
GachaItem,
GachaLogInfo,
ImportType,
Pool,
SRGFInfo,
SRGFItem,
SRGFModel,
)
from modules.gacha_log.online_view import GachaLogOnlineView
from modules.gacha_log.ranks import GachaLogRanks
from utils.const import PROJECT_ROOT
from utils.uid import mask_number
if TYPE_CHECKING:
from core.dependence.assets import AssetsService
GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "warp_log")
GACHA_LOG_PATH.mkdir(parents=True, exist_ok=True)
class GachaLog(GachaLogOnlineView, GachaLogRanks):
def __init__(
self,
gacha_log_path: Path = GACHA_LOG_PATH,
gacha_log_rank_service: GachaLogRankService = None,
):
GachaLogOnlineView.__init__(self)
GachaLogRanks.__init__(self, gacha_log_rank_service)
self.gacha_log_path = gacha_log_path
@staticmethod
async def load_json(path):
async with aiofiles.open(path, "r", encoding="utf-8") as f:
return json.loads(await f.read())
@staticmethod
async def save_json(path, data):
async with aiofiles.open(path, "w", encoding="utf-8") as f:
if isinstance(data, dict):
return await f.write(json.dumps(data, ensure_ascii=False, indent=4))
await f.write(data)
async def load_history_info(
self, user_id: str, uid: str, only_status: bool = False
) -> Tuple[Optional[GachaLogInfo], bool]:
"""读取历史跃迁记录数据
:param user_id: 用户id
:param uid: 原神uid
:param only_status: 是否只读取状态
:return: 跃迁记录数据
"""
file_path = self.gacha_log_path / f"{user_id}-{uid}.json"
if only_status:
return None, file_path.exists()
if not file_path.exists():
return GachaLogInfo(user_id=user_id, uid=uid, update_time=datetime.datetime.now()), False
try:
return GachaLogInfo.parse_obj(await self.load_json(file_path)), True
except json.decoder.JSONDecodeError:
return GachaLogInfo(user_id=user_id, uid=uid, update_time=datetime.datetime.now()), False
async def remove_history_info(self, user_id: str, uid: str) -> bool:
"""删除历史跃迁记录数据
:param user_id: 用户id
:param uid: 原神uid
:return: 是否删除成功
"""
file_path = self.gacha_log_path / f"{user_id}-{uid}.json"
file_bak_path = self.gacha_log_path / f"{user_id}-{uid}.json.bak"
file_export_path = self.gacha_log_path / f"{user_id}-{uid}-uigf.json"
with contextlib.suppress(Exception):
file_bak_path.unlink(missing_ok=True)
with contextlib.suppress(Exception):
file_export_path.unlink(missing_ok=True)
if file_path.exists():
try:
file_path.unlink()
except PermissionError:
return False
return True
return False
async def move_history_info(self, user_id: str, uid: str, new_user_id: str) -> bool:
"""移动历史抽卡记录数据
:param user_id: 用户id
:param uid: 原神uid
:param new_user_id: 新用户id
:return: 是否移动成功
"""
old_file_path = self.gacha_log_path / f"{user_id}-{uid}.json"
new_file_path = self.gacha_log_path / f"{new_user_id}-{uid}.json"
if (not old_file_path.exists()) or new_file_path.exists():
return False
try:
old_file_path.rename(new_file_path)
return True
except PermissionError:
return False
async def save_gacha_log_info(self, user_id: str, uid: str, info: GachaLogInfo):
"""保存跃迁记录数据
:param user_id: 用户id
:param uid: 玩家uid
:param info: 跃迁记录数据
"""
save_path = self.gacha_log_path / f"{user_id}-{uid}.json"
save_path_bak = self.gacha_log_path / f"{user_id}-{uid}.json.bak"
# 将旧数据备份一次
with contextlib.suppress(PermissionError):
if save_path.exists():
if save_path_bak.exists():
save_path_bak.unlink()
save_path.rename(save_path.parent / f"{save_path.name}.bak")
# 写入数据
await self.save_json(save_path, info.json())
async def gacha_log_to_srgf(self, user_id: str, uid: str) -> Optional[Path]:
"""跃迁日记转换为 SRGF 格式
:param user_id: 用户ID
:param uid: 游戏UID
:return: 转换是否成功、转换信息、SRGF 文件目录
"""
data, state = await self.load_history_info(user_id, uid)
if not state:
raise GachaLogNotFound
save_path = self.gacha_log_path / f"{user_id}-{uid}-srgf.json"
info = SRGFModel(info=SRGFInfo(uid=uid, export_app=ImportType.PaiGram.value, export_app_version="v3"), list=[])
for items in data.item_list.values():
for item in items:
info.list.append(
SRGFItem(
id=item.id,
name=item.name,
gacha_id=item.gacha_id,
gacha_type=item.gacha_type,
item_id=item.item_id,
item_type=item.item_type,
rank_type=item.rank_type,
time=item.time.strftime("%Y-%m-%d %H:%M:%S"),
)
)
await self.save_json(save_path, json.loads(info.json()))
return save_path
@staticmethod
async def verify_data(data: List[GachaItem]) -> bool:
try:
total = len(data)
five_star = len([i for i in data if i.rank_type == "5"])
four_star = len([i for i in data if i.rank_type == "4"])
if total > 50:
if total <= five_star * 15:
raise GachaLogFileError(
"检测到您将要导入的跃迁记录中五星数量过多,可能是由于文件错误导致的,请检查后重新导入。"
)
if four_star < five_star:
raise GachaLogFileError(
"检测到您将要导入的跃迁记录中五星数量过多,可能是由于文件错误导致的,请检查后重新导入。"
)
return True
except Exception as exc: # pylint: disable=W0703
raise GachaLogFileError from exc
@staticmethod
def import_data_backend(all_items: List[GachaItem], gacha_log: GachaLogInfo, temp_id_data: Dict) -> int:
new_num = 0
for item_info in all_items:
pool_name = GACHA_TYPE_LIST[StarRailBannerType(int(item_info.gacha_type))]
if pool_name not in temp_id_data:
temp_id_data[pool_name] = []
if pool_name not in gacha_log.item_list:
gacha_log.item_list[pool_name] = []
if item_info.id not in temp_id_data[pool_name]:
gacha_log.item_list[pool_name].append(item_info)
temp_id_data[pool_name].append(item_info.id)
new_num += 1
return new_num
async def import_gacha_log_data(self, user_id: int, player_id: int, data: dict, verify_uid: bool = True) -> int:
new_num = 0
try:
uid = data["info"]["uid"]
if not verify_uid:
uid = player_id
elif int(uid) != player_id:
raise GachaLogAccountNotFound
try:
import_type = ImportType(data["info"]["export_app"])
except ValueError:
import_type = ImportType.UNKNOWN
# 检查导入数据是否合法
all_items = [GachaItem(**i) for i in data["list"]]
await self.verify_data(all_items)
gacha_log, status = await self.load_history_info(str(user_id), uid)
# 将唯一 id 放入临时数据中,加快查找速度
temp_id_data = {
pool_name: [i.id for i in pool_data] for pool_name, pool_data in gacha_log.item_list.items()
}
# 使用新线程进行遍历,避免堵塞主线程
loop = asyncio.get_event_loop()
# 可以使用with语句来确保线程执行完成后及时被清理
with ThreadPoolExecutor() as executor:
new_num = await loop.run_in_executor(
executor, self.import_data_backend, all_items, gacha_log, temp_id_data
)
for i in gacha_log.item_list.values():
# 检查导入后的数据是否合法
await self.verify_data(i)
i.sort(key=lambda x: (x.time, x.id))
gacha_log.update_time = datetime.datetime.now()
gacha_log.import_type = import_type.value
await self.save_gacha_log_info(str(user_id), uid, gacha_log)
await self.recount_one_from_uid(user_id, player_id)
return new_num
except GachaLogAccountNotFound as e:
raise GachaLogAccountNotFound("导入失败,文件包含的祈愿记录所属 uid 与你当前绑定的 uid 不同") from e
except GachaLogMixedProvider as e:
raise GachaLogMixedProvider from e
except Exception as exc:
raise GachaLogException from exc
@staticmethod
def get_game_client(player_id: int) -> StarRailClient:
if recognize_starrail_server(player_id) in ["prod_gf_cn", "prod_qd_cn"]:
return StarRailClient(player_id=player_id, region=Region.CHINESE, lang="zh-cn")
else:
return StarRailClient(player_id=player_id, region=Region.OVERSEAS, lang="zh-cn")
async def get_gacha_log_data(self, user_id: int, player_id: int, authkey: str) -> int:
"""使用authkey获取跃迁记录数据并合并旧数据
:param user_id: 用户id
:param player_id: 玩家id
:param authkey: authkey
:return: 更新结果
"""
new_num = 0
gacha_log, _ = await self.load_history_info(str(user_id), str(player_id))
# 将唯一 id 放入临时数据中,加快查找速度
temp_id_data = {pool_name: {i.id: i for i in pool_data} for pool_name, pool_data in gacha_log.item_list.items()}
client = self.get_game_client(player_id)
try:
for pool_id, pool_name in GACHA_TYPE_LIST.items():
wish_history = await client.wish_history(pool_id.value, authkey=authkey)
for data in wish_history:
item = GachaItem(
id=str(data.id),
name=data.name,
gacha_id=str(data.banner_id),
gacha_type=str(data.banner_type.value),
item_id=str(data.item_id),
item_type=data.type,
rank_type=str(data.rarity),
time=datetime.datetime(
data.time.year,
data.time.month,
data.time.day,
data.time.hour,
data.time.minute,
data.time.second,
),
)
if pool_name not in temp_id_data:
temp_id_data[pool_name] = {}
if pool_name not in gacha_log.item_list:
gacha_log.item_list[pool_name] = []
if item.id not in temp_id_data[pool_name].keys():
gacha_log.item_list[pool_name].append(item)
temp_id_data[pool_name][item.id] = item
new_num += 1
else:
old_item: GachaItem = temp_id_data[pool_name][item.id]
old_item.gacha_id = item.gacha_id
old_item.item_id = item.item_id
except AuthkeyTimeout as exc:
raise GachaLogAuthkeyTimeout from exc
except InvalidAuthkey as exc:
raise GachaLogInvalidAuthkey from exc
finally:
await client.shutdown()
for i in gacha_log.item_list.values():
i.sort(key=lambda x: (x.time, x.id))
gacha_log.update_time = datetime.datetime.now()
gacha_log.import_type = ImportType.PaiGram.value
await self.save_gacha_log_info(str(user_id), str(player_id), gacha_log)
return new_num
@staticmethod
def check_avatar_up(name: str, gacha_time: datetime.datetime) -> bool:
if name in {"姬子", "瓦尔特", "布洛妮娅", "杰帕德", "克拉拉", "彦卿", "白露"}:
return False
return True
async def get_all_5_star_items(self, data: List[GachaItem], assets: "AssetsService", pool_name: str = "角色跃迁"):
"""
获取所有5星角色
:param data: 跃迁记录
:param assets: 资源服务
:param pool_name: 池子名称
:return: 5星角色列表
"""
count = 0
result = []
for item in data:
count += 1
if item.rank_type == "5":
if item.item_type == "角色" and pool_name in {"角色跃迁", "常驻跃迁", "新手跃迁"}:
if pool_name == "新手跃迁":
isUp, isBig = True, False
elif pool_name == "角色跃迁":
isUp, isBig = (
self.check_avatar_up(item.name, item.time),
(not result[-1].isUp) if result else False,
)
else:
isUp, isBig = False, False
data = {
"name": item.name,
"icon": assets.avatar.square(item.name).as_uri() if assets else "",
"count": count,
"type": "角色",
"isUp": isUp,
"isBig": isBig,
"time": item.time,
}
result.append(FiveStarItem.construct(**data))
elif item.item_type == "光锥" and pool_name in {"光锥跃迁", "常驻跃迁"}:
data = {
"name": item.name,
"icon": assets.light_cone.icon(item.name).as_uri() if assets else "",
"count": count,
"type": "光锥",
"isUp": False,
"isBig": False,
"time": item.time,
}
result.append(FiveStarItem.construct(**data))
count = 0
result.reverse()
return result, count
@staticmethod
async def get_all_4_star_items(data: List[GachaItem], assets: "AssetsService"):
"""
获取 no_fout_star
:param data: 跃迁记录
:param assets: 资源服务
:return: no_fout_star
"""
count = 0
result = []
for item in data:
count += 1
if item.rank_type == "4":
if item.item_type == "角色":
data = {
"name": item.name,
"icon": assets.avatar.square(item.name).as_uri() if assets else "",
"count": count,
"type": "角色",
"time": item.time,
}
result.append(FourStarItem.construct(**data))
elif item.item_type == "光锥":
data = {
"name": item.name,
"icon": assets.light_cone.icon(item.name).as_uri() if assets else "",
"count": count,
"type": "光锥",
"time": item.time,
}
result.append(FourStarItem.construct(**data))
count = 0
result.reverse()
return result, count
@staticmethod
def get_301_pool_data(total: int, all_five: List[FiveStarItem], no_five_star: int, no_four_star: int):
# 总共五星
five_star = len(all_five)
five_star_up = len([i for i in all_five if i.isUp])
five_star_big = len([i for i in all_five if i.isBig])
# 五星平均
five_star_avg = round((total - no_five_star) / five_star, 2) if five_star != 0 else 0
# 小保底不歪
small_protect = (
round((five_star_up - five_star_big) / (five_star - five_star_big) * 100.0, 1)
if five_star - five_star_big != 0
else "0.0"
)
# 五星常驻
five_star_const = five_star - five_star_up
# UP 平均
up_avg = (
round((total - no_five_star - (all_five[0].count if not all_five[0].isUp else 0)) / five_star_up, 2)
if five_star_up != 0
else 0
)
# UP 花费原石
up_cost = sum(i.count * 160 for i in all_five if i.isUp)
up_cost = f"{round(up_cost / 10000, 2)}w" if up_cost >= 10000 else up_cost
return [
[
{"num": no_five_star, "unit": "", "lable": "未出五星"},
{"num": five_star, "unit": "", "lable": "五星"},
{"num": five_star_avg, "unit": "", "lable": "五星平均"},
{"num": small_protect, "unit": "%", "lable": "小保底不歪"},
{"num": no_four_star, "unit": "", "lable": "未出四星"},
{"num": five_star_const, "unit": "", "lable": "五星常驻"},
{"num": up_avg, "unit": "", "lable": "UP平均"},
{"num": up_cost, "unit": "", "lable": "UP花费星琼"},
],
]
@staticmethod
def get_200_pool_data(
total: int, all_five: List[FiveStarItem], all_four: List[FourStarItem], no_five_star: int, no_four_star: int
):
# 总共五星
five_star = len(all_five)
# 五星平均
five_star_avg = round((total - no_five_star) / five_star, 2) if five_star != 0 else 0
# 五星光锥
five_star_weapon = len([i for i in all_five if i.type == "光锥"])
# 总共四星
four_star = len(all_four)
# 四星平均
four_star_avg = round((total - no_four_star) / four_star, 2) if four_star != 0 else 0
# 四星最多
four_star_name_list = [i.name for i in all_four]
four_star_max = max(four_star_name_list, key=four_star_name_list.count) if four_star_name_list else ""
four_star_max_count = four_star_name_list.count(four_star_max)
return [
[
{"num": no_five_star, "unit": "", "lable": "未出五星"},
{"num": five_star, "unit": "", "lable": "五星"},
{"num": five_star_avg, "unit": "", "lable": "五星平均"},
{"num": five_star_weapon, "unit": "", "lable": "五星光锥"},
{"num": no_four_star, "unit": "", "lable": "未出四星"},
{"num": four_star, "unit": "", "lable": "四星"},
{"num": four_star_avg, "unit": "", "lable": "四星平均"},
{"num": four_star_max_count, "unit": four_star_max, "lable": "四星最多"},
],
]
@staticmethod
def get_302_pool_data(
total: int, all_five: List[FiveStarItem], all_four: List[FourStarItem], no_five_star: int, no_four_star: int
):
# 总共五星
five_star = len(all_five)
# 五星平均
five_star_avg = round((total - no_five_star) / five_star, 2) if five_star != 0 else 0
# 四星光锥
four_star_weapon = len([i for i in all_four if i.type == "光锥"])
# 总共四星
four_star = len(all_four)
# 四星平均
four_star_avg = round((total - no_four_star) / four_star, 2) if four_star != 0 else 0
# 四星最多
four_star_name_list = [i.name for i in all_four]
four_star_max = max(four_star_name_list, key=four_star_name_list.count) if four_star_name_list else ""
four_star_max_count = four_star_name_list.count(four_star_max)
return [
[
{"num": no_five_star, "unit": "", "lable": "未出五星"},
{"num": five_star, "unit": "", "lable": "五星"},
{"num": five_star_avg, "unit": "", "lable": "五星平均"},
{"num": four_star_weapon, "unit": "", "lable": "四星光锥"},
{"num": no_four_star, "unit": "", "lable": "未出四星"},
{"num": four_star, "unit": "", "lable": "四星"},
{"num": four_star_avg, "unit": "", "lable": "四星平均"},
{"num": four_star_max_count, "unit": four_star_max, "lable": "四星最多"},
],
]
@staticmethod
def count_fortune(pool_name: str, summon_data, weapon: bool = False):
"""
角色 光锥
欧 50以下 45以下
吉 50-60 45-55
中 60-70 55-65
非 70以上 65以上
"""
data = [45, 55, 65] if weapon else [50, 60, 70]
for i in summon_data:
for j in i:
if j.get("lable") == "五星平均":
num = j.get("num", 0)
if num == 0:
return pool_name
if num <= data[0]:
return f"{pool_name} · 欧"
if num <= data[1]:
return f"{pool_name} · 吉"
if num <= data[2]:
return f"{pool_name} · 普通"
return f"{pool_name} · 非"
return pool_name
async def get_analysis(self, user_id: int, player_id: int, pool: StarRailBannerType, assets: "AssetsService"):
"""
获取跃迁记录分析数据
:param user_id: 用户id
:param player_id: 玩家id
:param pool: 池子类型
:param assets: 资源服务
:return: 分析数据
"""
gacha_log, status = await self.load_history_info(str(user_id), str(player_id))
if not status:
raise GachaLogNotFound
return await self.get_analysis_data(gacha_log, pool, assets)
async def get_analysis_data(
self, gacha_log: "GachaLogInfo", pool: StarRailBannerType, assets: Optional["AssetsService"]
):
"""
获取抽卡记录分析数据
:param gacha_log: 抽卡记录
:param pool: 池子类型
:param assets: 资源服务
:return: 分析数据
"""
player_id = int(gacha_log.uid)
pool_name = GACHA_TYPE_LIST[pool]
if pool_name not in gacha_log.item_list:
raise GachaLogNotFound
data = gacha_log.item_list[pool_name]
total = len(data)
if total == 0:
raise GachaLogNotFound
all_five, no_five_star = await self.get_all_5_star_items(data, assets, pool_name)
all_four, no_four_star = await self.get_all_4_star_items(data, assets)
summon_data = None
if pool in [StarRailBannerType.CHARACTER, StarRailBannerType.NOVICE]:
summon_data = self.get_301_pool_data(total, all_five, no_five_star, no_four_star)
pool_name = self.count_fortune(pool_name, summon_data)
elif pool == StarRailBannerType.WEAPON:
summon_data = self.get_302_pool_data(total, all_five, all_four, no_five_star, no_four_star)
pool_name = self.count_fortune(pool_name, summon_data, True)
elif pool == StarRailBannerType.PERMANENT:
summon_data = self.get_200_pool_data(total, all_five, all_four, no_five_star, no_four_star)
pool_name = self.count_fortune(pool_name, summon_data)
last_time = data[0].time.strftime("%Y-%m-%d %H:%M")
first_time = data[-1].time.strftime("%Y-%m-%d %H:%M")
return {
"uid": mask_number(player_id),
"allNum": total,
"type": pool.value,
"typeName": pool_name,
"line": summon_data,
"firstTime": first_time,
"lastTime": last_time,
"fiveLog": all_five,
"fourLog": all_four[:36],
}
async def get_pool_analysis(
self, user_id: int, player_id: int, pool: StarRailBannerType, assets: "AssetsService", group: bool
) -> dict:
"""获取跃迁记录分析数据
:param user_id: 用户id
:param player_id: 玩家id
:param pool: 池子类型
:param assets: 资源服务
:param group: 是否群组
:return: 分析数据
"""
gacha_log, status = await self.load_history_info(str(user_id), str(player_id))
if not status:
raise GachaLogNotFound
pool_name = GACHA_TYPE_LIST[pool]
if pool_name not in gacha_log.item_list:
raise GachaLogNotFound
data = gacha_log.item_list[pool_name]
total = len(data)
if total == 0:
raise GachaLogNotFound
all_five, _ = await self.get_all_5_star_items(data, assets, pool_name)
all_four, _ = await self.get_all_4_star_items(data, assets)
pool_data = []
up_pool_data = [Pool(**i) for i in get_pool_by_id(pool.value)]
for up_pool in up_pool_data:
for item in all_five:
up_pool.parse(item)
for item in all_four:
up_pool.parse(item)
up_pool.count_item(data)
for up_pool in up_pool_data:
pool_data.append(
{
"count": up_pool.count,
"list": up_pool.to_list(),
"name": up_pool.name,
"start": up_pool.start.strftime("%Y-%m-%d"),
"end": up_pool.end.strftime("%Y-%m-%d"),
}
)
pool_data = [i for i in pool_data if i["count"] > 0]
return {
"uid": mask_number(player_id),
"typeName": pool_name,
"pool": pool_data[:6] if group else pool_data,
"hasMore": len(pool_data) > 6,
}
async def get_all_five_analysis(self, user_id: int, player_id: int, assets: "AssetsService") -> dict:
"""获取五星跃迁记录分析数据
:param user_id: 用户id
:param player_id: 玩家id
:param assets: 资源服务
:return: 分析数据
"""
gacha_log, status = await self.load_history_info(str(user_id), str(player_id))
if not status:
raise GachaLogNotFound
pools = []
for pool_name, items in gacha_log.item_list.items():
pool = Pool(
five=[pool_name],
four=[],
name=pool_name,
to=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
**{"from": "2020-09-28 00:00:00"},
)
all_five, _ = await self.get_all_5_star_items(items, assets, pool_name)
for item in all_five:
pool.parse(item)
pool.count_item(items)
pools.append(pool)
pool_data = [
{
"count": up_pool.count,
"list": up_pool.to_list(),
"name": up_pool.name,
"start": up_pool.start.strftime("%Y-%m-%d"),
"end": up_pool.end.strftime("%Y-%m-%d"),
}
for up_pool in pools
]
return {
"uid": mask_number(player_id),
"typeName": "五星列表",
"pool": pool_data,
"hasMore": False,
}