mirror of
https://github.com/PaiGramTeam/PaiGram.git
synced 2024-11-29 19:08:48 +00:00
740 lines
30 KiB
Python
740 lines
30 KiB
Python
import asyncio
|
||
import contextlib
|
||
import datetime
|
||
import json
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from os import PathLike
|
||
from pathlib import Path
|
||
from typing import Dict, IO, List, Optional, Tuple, Union
|
||
|
||
import aiofiles
|
||
from genshin import AuthkeyTimeout, Client, InvalidAuthkey
|
||
from genshin.models import BannerType
|
||
from openpyxl import load_workbook
|
||
|
||
from core.dependence.assets import AssetsService
|
||
from metadata.pool.pool import get_pool_by_id
|
||
from metadata.shortname import roleToId, weaponToId
|
||
from modules.gacha_log.const import GACHA_TYPE_LIST, PAIMONMOE_VERSION
|
||
from modules.gacha_log.error import (
|
||
GachaLogAccountNotFound,
|
||
GachaLogAuthkeyTimeout,
|
||
GachaLogException,
|
||
GachaLogFileError,
|
||
GachaLogInvalidAuthkey,
|
||
GachaLogMixedProvider,
|
||
GachaLogNotFound,
|
||
PaimonMoeGachaLogFileError,
|
||
)
|
||
from modules.gacha_log.models import (
|
||
FiveStarItem,
|
||
FourStarItem,
|
||
GachaItem,
|
||
GachaLogInfo,
|
||
ImportType,
|
||
ItemType,
|
||
Pool,
|
||
UIGFGachaType,
|
||
UIGFInfo,
|
||
UIGFItem,
|
||
UIGFModel,
|
||
)
|
||
from utils.const import PROJECT_ROOT
|
||
|
||
GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "gacha_log")
|
||
GACHA_LOG_PATH.mkdir(parents=True, exist_ok=True)
|
||
|
||
|
||
class GachaLog:
|
||
def __init__(self, gacha_log_path: Path = GACHA_LOG_PATH):
|
||
self.gacha_log_path = gacha_log_path
|
||
|
||
@staticmethod
|
||
async def load_json(path):
|
||
async with aiofiles.open(path, "r", encoding="utf-8") as f:
|
||
return json.loads(await f.read())
|
||
|
||
@staticmethod
|
||
async def save_json(path, data):
|
||
async with aiofiles.open(path, "w", encoding="utf-8") as f:
|
||
if isinstance(data, dict):
|
||
return await f.write(json.dumps(data, ensure_ascii=False, indent=4))
|
||
await f.write(data)
|
||
|
||
async def load_history_info(
|
||
self, user_id: str, uid: str, only_status: bool = False
|
||
) -> Tuple[Optional[GachaLogInfo], bool]:
|
||
"""读取历史抽卡记录数据
|
||
:param user_id: 用户id
|
||
:param uid: 原神uid
|
||
:param only_status: 是否只读取状态
|
||
:return: 抽卡记录数据
|
||
"""
|
||
file_path = self.gacha_log_path / f"{user_id}-{uid}.json"
|
||
if only_status:
|
||
return None, file_path.exists()
|
||
if not file_path.exists():
|
||
return GachaLogInfo(user_id=user_id, uid=uid, update_time=datetime.datetime.now()), False
|
||
try:
|
||
return GachaLogInfo.parse_obj(await self.load_json(file_path)), True
|
||
except json.decoder.JSONDecodeError:
|
||
return GachaLogInfo(user_id=user_id, uid=uid, update_time=datetime.datetime.now()), False
|
||
|
||
async def remove_history_info(self, user_id: str, uid: str) -> bool:
|
||
"""删除历史抽卡记录数据
|
||
:param user_id: 用户id
|
||
:param uid: 原神uid
|
||
:return: 是否删除成功
|
||
"""
|
||
file_path = self.gacha_log_path / f"{user_id}-{uid}.json"
|
||
file_bak_path = self.gacha_log_path / f"{user_id}-{uid}.json.bak"
|
||
file_export_path = self.gacha_log_path / f"{user_id}-{uid}-uigf.json"
|
||
with contextlib.suppress(Exception):
|
||
file_bak_path.unlink(missing_ok=True)
|
||
with contextlib.suppress(Exception):
|
||
file_export_path.unlink(missing_ok=True)
|
||
if file_path.exists():
|
||
try:
|
||
file_path.unlink()
|
||
except PermissionError:
|
||
return False
|
||
return True
|
||
return False
|
||
|
||
async def save_gacha_log_info(self, user_id: str, uid: str, info: GachaLogInfo):
|
||
"""保存抽卡记录数据
|
||
:param user_id: 用户id
|
||
:param uid: 原神uid
|
||
:param info: 抽卡记录数据
|
||
"""
|
||
save_path = self.gacha_log_path / f"{user_id}-{uid}.json"
|
||
save_path_bak = self.gacha_log_path / f"{user_id}-{uid}.json.bak"
|
||
# 将旧数据备份一次
|
||
with contextlib.suppress(PermissionError):
|
||
if save_path.exists():
|
||
if save_path_bak.exists():
|
||
save_path_bak.unlink()
|
||
save_path.rename(save_path.parent / f"{save_path.name}.bak")
|
||
# 写入数据
|
||
await self.save_json(save_path, info.json())
|
||
|
||
async def gacha_log_to_uigf(self, user_id: str, uid: str) -> Optional[Path]:
|
||
"""抽卡日记转换为 UIGF 格式
|
||
:param user_id: 用户ID
|
||
:param uid: 游戏UID
|
||
:return: 转换是否成功、转换信息、UIGF文件目录
|
||
"""
|
||
data, state = await self.load_history_info(user_id, uid)
|
||
if not state:
|
||
raise GachaLogNotFound
|
||
save_path = self.gacha_log_path / f"{user_id}-{uid}-uigf.json"
|
||
info = UIGFModel(info=UIGFInfo(uid=uid, export_app=ImportType.PaiGram.value, export_app_version="v3"), list=[])
|
||
for items in data.item_list.values():
|
||
for item in items:
|
||
info.list.append(
|
||
UIGFItem(
|
||
id=item.id,
|
||
name=item.name,
|
||
gacha_type=item.gacha_type,
|
||
item_type=item.item_type,
|
||
rank_type=item.rank_type,
|
||
time=item.time.strftime("%Y-%m-%d %H:%M:%S"),
|
||
uigf_gacha_type=item.gacha_type,
|
||
)
|
||
)
|
||
await self.save_json(save_path, json.loads(info.json()))
|
||
return save_path
|
||
|
||
@staticmethod
|
||
async def verify_data(data: List[GachaItem]) -> bool:
|
||
try:
|
||
total = len(data)
|
||
five_star = len([i for i in data if i.rank_type == "5"])
|
||
four_star = len([i for i in data if i.rank_type == "4"])
|
||
if total > 50:
|
||
if total <= five_star * 15:
|
||
raise GachaLogFileError("检测到您将要导入的抽卡记录中五星数量过多,可能是由于文件错误导致的,请检查后重新导入。")
|
||
if four_star < five_star:
|
||
raise GachaLogFileError("检测到您将要导入的抽卡记录中五星数量过多,可能是由于文件错误导致的,请检查后重新导入。")
|
||
return True
|
||
except Exception as exc: # pylint: disable=W0703
|
||
raise GachaLogFileError from exc
|
||
|
||
@staticmethod
|
||
def import_data_backend(all_items: List[GachaItem], gacha_log: GachaLogInfo, temp_id_data: Dict) -> int:
|
||
new_num = 0
|
||
for item_info in all_items:
|
||
pool_name = GACHA_TYPE_LIST[BannerType(int(item_info.gacha_type))]
|
||
if item_info.id not in temp_id_data[pool_name]:
|
||
gacha_log.item_list[pool_name].append(item_info)
|
||
temp_id_data[pool_name].append(item_info.id)
|
||
new_num += 1
|
||
return new_num
|
||
|
||
async def import_gacha_log_data(self, user_id: int, client: Client, data: dict, verify_uid: bool = True) -> int:
|
||
new_num = 0
|
||
try:
|
||
uid = data["info"]["uid"]
|
||
if not verify_uid:
|
||
uid = client.uid
|
||
elif int(uid) != client.uid:
|
||
raise GachaLogAccountNotFound
|
||
try:
|
||
import_type = ImportType(data["info"]["export_app"])
|
||
except ValueError:
|
||
import_type = ImportType.UNKNOWN
|
||
# 检查导入数据是否合法
|
||
all_items = [GachaItem(**i) for i in data["list"]]
|
||
await self.verify_data(all_items)
|
||
gacha_log, status = await self.load_history_info(str(user_id), uid)
|
||
if import_type == ImportType.PAIMONMOE:
|
||
if status and gacha_log.get_import_type != ImportType.PAIMONMOE:
|
||
raise GachaLogMixedProvider
|
||
elif status and gacha_log.get_import_type == ImportType.PAIMONMOE:
|
||
raise GachaLogMixedProvider
|
||
# 将唯一 id 放入临时数据中,加快查找速度
|
||
temp_id_data = {
|
||
pool_name: [i.id for i in pool_data] for pool_name, pool_data in gacha_log.item_list.items()
|
||
}
|
||
# 使用新线程进行遍历,避免堵塞主线程
|
||
loop = asyncio.get_event_loop()
|
||
# 可以使用with语句来确保线程执行完成后及时被清理
|
||
with ThreadPoolExecutor() as executor:
|
||
new_num = await loop.run_in_executor(
|
||
executor, self.import_data_backend, all_items, gacha_log, temp_id_data
|
||
)
|
||
for i in gacha_log.item_list.values():
|
||
# 检查导入后的数据是否合法
|
||
await self.verify_data(i)
|
||
i.sort(key=lambda x: (x.time, x.id))
|
||
gacha_log.update_time = datetime.datetime.now()
|
||
gacha_log.import_type = import_type.value
|
||
await self.save_gacha_log_info(str(user_id), uid, gacha_log)
|
||
return new_num
|
||
except GachaLogAccountNotFound as e:
|
||
raise GachaLogAccountNotFound("导入失败,文件包含的祈愿记录所属 uid 与你当前绑定的 uid 不同") from e
|
||
except GachaLogMixedProvider as e:
|
||
raise GachaLogMixedProvider from e
|
||
except Exception as exc:
|
||
raise GachaLogException from exc
|
||
|
||
async def get_gacha_log_data(self, user_id: int, client: Client, authkey: str) -> int:
|
||
"""使用authkey获取抽卡记录数据,并合并旧数据
|
||
:param user_id: 用户id
|
||
:param client: genshin client
|
||
:param authkey: authkey
|
||
:return: 更新结果
|
||
"""
|
||
new_num = 0
|
||
gacha_log, _ = await self.load_history_info(str(user_id), str(client.uid))
|
||
if gacha_log.get_import_type == ImportType.PAIMONMOE:
|
||
raise GachaLogMixedProvider
|
||
# 将唯一 id 放入临时数据中,加快查找速度
|
||
temp_id_data = {pool_name: [i.id for i in pool_data] for pool_name, pool_data in gacha_log.item_list.items()}
|
||
try:
|
||
for pool_id, pool_name in GACHA_TYPE_LIST.items():
|
||
async for data in client.wish_history(pool_id, authkey=authkey):
|
||
item = GachaItem(
|
||
id=str(data.id),
|
||
name=data.name,
|
||
gacha_type=str(data.banner_type.value),
|
||
item_type=data.type,
|
||
rank_type=str(data.rarity),
|
||
time=datetime.datetime(
|
||
data.time.year,
|
||
data.time.month,
|
||
data.time.day,
|
||
data.time.hour,
|
||
data.time.minute,
|
||
data.time.second,
|
||
),
|
||
)
|
||
|
||
if item.id not in temp_id_data[pool_name]:
|
||
gacha_log.item_list[pool_name].append(item)
|
||
temp_id_data[pool_name].append(item.id)
|
||
new_num += 1
|
||
except AuthkeyTimeout as exc:
|
||
raise GachaLogAuthkeyTimeout from exc
|
||
except InvalidAuthkey as exc:
|
||
raise GachaLogInvalidAuthkey from exc
|
||
for i in gacha_log.item_list.values():
|
||
i.sort(key=lambda x: (x.time, x.id))
|
||
gacha_log.update_time = datetime.datetime.now()
|
||
gacha_log.import_type = ImportType.UIGF.value
|
||
await self.save_gacha_log_info(str(user_id), str(client.uid), gacha_log)
|
||
return new_num
|
||
|
||
@staticmethod
|
||
def check_avatar_up(name: str, gacha_time: datetime.datetime) -> bool:
|
||
if name in {"莫娜", "七七", "迪卢克", "琴", "迪希雅"}:
|
||
return False
|
||
if name == "刻晴":
|
||
start_time = datetime.datetime.strptime("2021-02-17 18:00:00", "%Y-%m-%d %H:%M:%S")
|
||
end_time = datetime.datetime.strptime("2021-03-02 15:59:59", "%Y-%m-%d %H:%M:%S")
|
||
if not start_time < gacha_time < end_time:
|
||
return False
|
||
elif name == "提纳里":
|
||
start_time = datetime.datetime.strptime("2022-08-24 06:00:00", "%Y-%m-%d %H:%M:%S")
|
||
end_time = datetime.datetime.strptime("2022-09-09 17:59:59", "%Y-%m-%d %H:%M:%S")
|
||
if not start_time < gacha_time < end_time:
|
||
return False
|
||
return True
|
||
|
||
async def get_all_5_star_items(self, data: List[GachaItem], assets: AssetsService, pool_name: str = "角色祈愿"):
|
||
"""
|
||
获取所有5星角色
|
||
:param data: 抽卡记录
|
||
:param assets: 资源服务
|
||
:param pool_name: 池子名称
|
||
:return: 5星角色列表
|
||
"""
|
||
count = 0
|
||
result = []
|
||
for item in data:
|
||
count += 1
|
||
if item.rank_type == "5":
|
||
if item.item_type == "角色" and pool_name in {"角色祈愿", "常驻祈愿"}:
|
||
data = {
|
||
"name": item.name,
|
||
"icon": (await assets.avatar(roleToId(item.name)).icon()).as_uri(),
|
||
"count": count,
|
||
"type": "角色",
|
||
"isUp": self.check_avatar_up(item.name, item.time) if pool_name == "角色祈愿" else False,
|
||
"isBig": (not result[-1].isUp) if result and pool_name == "角色祈愿" else False,
|
||
"time": item.time,
|
||
}
|
||
result.append(FiveStarItem.construct(**data))
|
||
elif item.item_type == "武器" and pool_name in {"武器祈愿", "常驻祈愿"}:
|
||
data = {
|
||
"name": item.name,
|
||
"icon": (await assets.weapon(weaponToId(item.name)).icon()).as_uri(),
|
||
"count": count,
|
||
"type": "武器",
|
||
"isUp": False,
|
||
"isBig": False,
|
||
"time": item.time,
|
||
}
|
||
result.append(FiveStarItem.construct(**data))
|
||
count = 0
|
||
result.reverse()
|
||
return result, count
|
||
|
||
@staticmethod
|
||
async def get_all_4_star_items(data: List[GachaItem], assets: AssetsService):
|
||
"""
|
||
获取 no_fout_star
|
||
:param data: 抽卡记录
|
||
:param assets: 资源服务
|
||
:return: no_fout_star
|
||
"""
|
||
count = 0
|
||
result = []
|
||
for item in data:
|
||
count += 1
|
||
if item.rank_type == "4":
|
||
if item.item_type == "角色":
|
||
data = {
|
||
"name": item.name,
|
||
"icon": (await assets.avatar(roleToId(item.name)).icon()).as_uri(),
|
||
"count": count,
|
||
"type": "角色",
|
||
"time": item.time,
|
||
}
|
||
result.append(FourStarItem.construct(**data))
|
||
elif item.item_type == "武器":
|
||
data = {
|
||
"name": item.name,
|
||
"icon": (await assets.weapon(weaponToId(item.name)).icon()).as_uri(),
|
||
"count": count,
|
||
"type": "武器",
|
||
"time": item.time,
|
||
}
|
||
result.append(FourStarItem.construct(**data))
|
||
count = 0
|
||
result.reverse()
|
||
return result, count
|
||
|
||
@staticmethod
|
||
def get_301_pool_data(total: int, all_five: List[FiveStarItem], no_five_star: int, no_four_star: int):
|
||
# 总共五星
|
||
five_star = len(all_five)
|
||
five_star_up = len([i for i in all_five if i.isUp])
|
||
five_star_big = len([i for i in all_five if i.isBig])
|
||
# 五星平均
|
||
five_star_avg = round((total - no_five_star) / five_star, 2) if five_star != 0 else 0
|
||
# 小保底不歪
|
||
small_protect = (
|
||
round((five_star_up - five_star_big) / (five_star - five_star_big) * 100.0, 1)
|
||
if five_star - five_star_big != 0
|
||
else "0.0"
|
||
)
|
||
# 五星常驻
|
||
five_star_const = five_star - five_star_up
|
||
# UP 平均
|
||
up_avg = round((total - no_five_star) / five_star_up, 2) if five_star_up != 0 else 0
|
||
# UP 花费原石
|
||
up_cost = sum(i.count * 160 for i in all_five if i.isUp)
|
||
up_cost = f"{round(up_cost / 10000, 2)}w" if up_cost >= 10000 else up_cost
|
||
return [
|
||
[
|
||
{"num": no_five_star, "unit": "抽", "lable": "未出五星"},
|
||
{"num": five_star, "unit": "个", "lable": "五星"},
|
||
{"num": five_star_avg, "unit": "抽", "lable": "五星平均"},
|
||
{"num": small_protect, "unit": "%", "lable": "小保底不歪"},
|
||
],
|
||
[
|
||
{"num": no_four_star, "unit": "抽", "lable": "未出四星"},
|
||
{"num": five_star_const, "unit": "个", "lable": "五星常驻"},
|
||
{"num": up_avg, "unit": "抽", "lable": "UP平均"},
|
||
{"num": up_cost, "unit": "", "lable": "UP花费原石"},
|
||
],
|
||
]
|
||
|
||
@staticmethod
|
||
def get_200_pool_data(
|
||
total: int, all_five: List[FiveStarItem], all_four: List[FourStarItem], no_five_star: int, no_four_star: int
|
||
):
|
||
# 总共五星
|
||
five_star = len(all_five)
|
||
# 五星平均
|
||
five_star_avg = round((total - no_five_star) / five_star, 2) if five_star != 0 else 0
|
||
# 五星武器
|
||
five_star_weapon = len([i for i in all_five if i.type == "武器"])
|
||
# 总共四星
|
||
four_star = len(all_four)
|
||
# 四星平均
|
||
four_star_avg = round((total - no_four_star) / four_star, 2) if four_star != 0 else 0
|
||
# 四星最多
|
||
four_star_name_list = [i.name for i in all_four]
|
||
four_star_max = max(four_star_name_list, key=four_star_name_list.count) if four_star_name_list else ""
|
||
four_star_max_count = four_star_name_list.count(four_star_max)
|
||
return [
|
||
[
|
||
{"num": no_five_star, "unit": "抽", "lable": "未出五星"},
|
||
{"num": five_star, "unit": "个", "lable": "五星"},
|
||
{"num": five_star_avg, "unit": "抽", "lable": "五星平均"},
|
||
{"num": five_star_weapon, "unit": "个", "lable": "五星武器"},
|
||
],
|
||
[
|
||
{"num": no_four_star, "unit": "抽", "lable": "未出四星"},
|
||
{"num": four_star, "unit": "个", "lable": "四星"},
|
||
{"num": four_star_avg, "unit": "抽", "lable": "四星平均"},
|
||
{"num": four_star_max_count, "unit": four_star_max, "lable": "四星最多"},
|
||
],
|
||
]
|
||
|
||
@staticmethod
|
||
def get_302_pool_data(
|
||
total: int, all_five: List[FiveStarItem], all_four: List[FourStarItem], no_five_star: int, no_four_star: int
|
||
):
|
||
# 总共五星
|
||
five_star = len(all_five)
|
||
# 五星平均
|
||
five_star_avg = round((total - no_five_star) / five_star, 2) if five_star != 0 else 0
|
||
# 四星武器
|
||
four_star_weapon = len([i for i in all_four if i.type == "武器"])
|
||
# 总共四星
|
||
four_star = len(all_four)
|
||
# 四星平均
|
||
four_star_avg = round((total - no_four_star) / four_star, 2) if four_star != 0 else 0
|
||
# 四星最多
|
||
four_star_name_list = [i.name for i in all_four]
|
||
four_star_max = max(four_star_name_list, key=four_star_name_list.count) if four_star_name_list else ""
|
||
four_star_max_count = four_star_name_list.count(four_star_max)
|
||
return [
|
||
[
|
||
{"num": no_five_star, "unit": "抽", "lable": "未出五星"},
|
||
{"num": five_star, "unit": "个", "lable": "五星"},
|
||
{"num": five_star_avg, "unit": "抽", "lable": "五星平均"},
|
||
{"num": four_star_weapon, "unit": "个", "lable": "四星武器"},
|
||
],
|
||
[
|
||
{"num": no_four_star, "unit": "抽", "lable": "未出四星"},
|
||
{"num": four_star, "unit": "个", "lable": "四星"},
|
||
{"num": four_star_avg, "unit": "抽", "lable": "四星平均"},
|
||
{"num": four_star_max_count, "unit": four_star_max, "lable": "四星最多"},
|
||
],
|
||
]
|
||
|
||
@staticmethod
|
||
def count_fortune(pool_name: str, summon_data, weapon: bool = False):
|
||
"""
|
||
角色 武器
|
||
欧 50以下 45以下
|
||
吉 50-60 45-55
|
||
中 60-70 55-65
|
||
非 70以上 65以上
|
||
"""
|
||
data = [45, 55, 65] if weapon else [50, 60, 70]
|
||
for i in summon_data:
|
||
for j in i:
|
||
if j.get("lable") == "五星平均":
|
||
num = j.get("num", 0)
|
||
if num == 0:
|
||
return pool_name
|
||
if num <= data[0]:
|
||
return f"{pool_name} · 欧"
|
||
if num <= data[1]:
|
||
return f"{pool_name} · 吉"
|
||
if num <= data[2]:
|
||
return f"{pool_name} · 普通"
|
||
return f"{pool_name} · 非"
|
||
return pool_name
|
||
|
||
async def get_analysis(self, user_id: int, client: Client, pool: BannerType, assets: AssetsService):
|
||
"""
|
||
获取抽卡记录分析数据
|
||
:param user_id: 用户id
|
||
:param client: genshin client
|
||
:param pool: 池子类型
|
||
:param assets: 资源服务
|
||
:return: 分析数据
|
||
"""
|
||
gacha_log, status = await self.load_history_info(str(user_id), str(client.uid))
|
||
if not status:
|
||
raise GachaLogNotFound
|
||
pool_name = GACHA_TYPE_LIST[pool]
|
||
data = gacha_log.item_list[pool_name]
|
||
total = len(data)
|
||
if total == 0:
|
||
raise GachaLogNotFound
|
||
all_five, no_five_star = await self.get_all_5_star_items(data, assets, pool_name)
|
||
all_four, no_four_star = await self.get_all_4_star_items(data, assets)
|
||
summon_data = None
|
||
if pool == BannerType.CHARACTER1:
|
||
summon_data = self.get_301_pool_data(total, all_five, no_five_star, no_four_star)
|
||
pool_name = self.count_fortune(pool_name, summon_data)
|
||
elif pool == BannerType.WEAPON:
|
||
summon_data = self.get_302_pool_data(total, all_five, all_four, no_five_star, no_four_star)
|
||
pool_name = self.count_fortune(pool_name, summon_data, True)
|
||
elif pool == BannerType.PERMANENT:
|
||
summon_data = self.get_200_pool_data(total, all_five, all_four, no_five_star, no_four_star)
|
||
pool_name = self.count_fortune(pool_name, summon_data)
|
||
last_time = data[0].time.strftime("%Y-%m-%d %H:%M")
|
||
first_time = data[-1].time.strftime("%Y-%m-%d %H:%M")
|
||
return {
|
||
"uid": client.uid,
|
||
"allNum": total,
|
||
"type": pool.value,
|
||
"typeName": pool_name,
|
||
"line": summon_data,
|
||
"firstTime": first_time,
|
||
"lastTime": last_time,
|
||
"fiveLog": all_five,
|
||
"fourLog": all_four[:18],
|
||
}
|
||
|
||
async def get_pool_analysis(
|
||
self, user_id: int, client: Client, pool: BannerType, assets: AssetsService, group: bool
|
||
) -> dict:
|
||
"""获取抽卡记录分析数据
|
||
:param user_id: 用户id
|
||
:param client: genshin client
|
||
:param pool: 池子类型
|
||
:param assets: 资源服务
|
||
:param group: 是否群组
|
||
:return: 分析数据
|
||
"""
|
||
gacha_log, status = await self.load_history_info(str(user_id), str(client.uid))
|
||
if not status:
|
||
raise GachaLogNotFound
|
||
pool_name = GACHA_TYPE_LIST[pool]
|
||
data = gacha_log.item_list[pool_name]
|
||
total = len(data)
|
||
if total == 0:
|
||
raise GachaLogNotFound
|
||
all_five, _ = await self.get_all_5_star_items(data, assets, pool_name)
|
||
all_four, _ = await self.get_all_4_star_items(data, assets)
|
||
pool_data = []
|
||
up_pool_data = [Pool(**i) for i in get_pool_by_id(pool.value)]
|
||
for up_pool in up_pool_data:
|
||
for item in all_five:
|
||
up_pool.parse(item)
|
||
for item in all_four:
|
||
up_pool.parse(item)
|
||
up_pool.count_item(data)
|
||
for up_pool in up_pool_data:
|
||
pool_data.append(
|
||
{
|
||
"count": up_pool.count,
|
||
"list": up_pool.to_list(),
|
||
"name": up_pool.name,
|
||
"start": up_pool.start.strftime("%Y-%m-%d"),
|
||
"end": up_pool.end.strftime("%Y-%m-%d"),
|
||
}
|
||
)
|
||
pool_data = [i for i in pool_data if i["count"] > 0]
|
||
return {
|
||
"uid": client.uid,
|
||
"typeName": pool_name,
|
||
"pool": pool_data[:6] if group else pool_data,
|
||
"hasMore": len(pool_data) > 6,
|
||
}
|
||
|
||
async def get_all_five_analysis(self, user_id: int, client: Client, assets: AssetsService) -> dict:
|
||
"""获取五星抽卡记录分析数据
|
||
:param user_id: 用户id
|
||
:param client: genshin client
|
||
:param assets: 资源服务
|
||
:return: 分析数据
|
||
"""
|
||
gacha_log, status = await self.load_history_info(str(user_id), str(client.uid))
|
||
if not status:
|
||
raise GachaLogNotFound
|
||
pools = []
|
||
for pool_name, items in gacha_log.item_list.items():
|
||
pool = Pool(
|
||
five=[pool_name],
|
||
four=[],
|
||
name=pool_name,
|
||
to=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
**{"from": "2020-09-28 00:00:00"},
|
||
)
|
||
all_five, _ = await self.get_all_5_star_items(items, assets, pool_name)
|
||
for item in all_five:
|
||
pool.parse(item)
|
||
pool.count_item(items)
|
||
pools.append(pool)
|
||
pool_data = [
|
||
{
|
||
"count": up_pool.count,
|
||
"list": up_pool.to_list(),
|
||
"name": up_pool.name,
|
||
"start": up_pool.start.strftime("%Y-%m-%d"),
|
||
"end": up_pool.end.strftime("%Y-%m-%d"),
|
||
}
|
||
for up_pool in pools
|
||
]
|
||
return {
|
||
"uid": client.uid,
|
||
"typeName": "五星列表",
|
||
"pool": pool_data,
|
||
"hasMore": False,
|
||
}
|
||
|
||
@staticmethod
|
||
def convert_xlsx_to_uigf(file: Union[str, PathLike, IO[bytes]], zh_dict: Dict) -> Dict:
|
||
"""转换 paimone.moe 或 非小酋 导出 xlsx 数据为 UIGF 格式
|
||
:param file: 导出的 xlsx 文件
|
||
:param zh_dict:
|
||
:return: UIGF 格式数据
|
||
"""
|
||
|
||
def from_paimon_moe(
|
||
uigf_gacha_type: UIGFGachaType, item_type: str, name: str, date_string: str, rank_type: int, _id: int
|
||
) -> UIGFItem:
|
||
item_type = ItemType.CHARACTER if item_type == "Character" else ItemType.WEAPON
|
||
return UIGFItem(
|
||
id=str(_id),
|
||
name=zh_dict[name],
|
||
gacha_type=uigf_gacha_type,
|
||
item_type=item_type,
|
||
rank_type=str(rank_type),
|
||
time=date_string,
|
||
uigf_gacha_type=uigf_gacha_type,
|
||
)
|
||
|
||
def from_uigf(
|
||
uigf_gacha_type: str,
|
||
gacha__type: str,
|
||
item_type: str,
|
||
name: str,
|
||
date_string: str,
|
||
rank_type: str,
|
||
_id: str,
|
||
) -> UIGFItem:
|
||
return UIGFItem(
|
||
id=_id,
|
||
name=name,
|
||
gacha_type=gacha__type,
|
||
item_type=item_type,
|
||
rank_type=rank_type,
|
||
time=date_string,
|
||
uigf_gacha_type=uigf_gacha_type,
|
||
)
|
||
|
||
def from_fxq(
|
||
uigf_gacha_type: UIGFGachaType, item_type: str, name: str, date_string: str, rank_type: int, _id: int
|
||
) -> UIGFItem:
|
||
item_type = ItemType.CHARACTER if item_type == "角色" else ItemType.WEAPON
|
||
return UIGFItem(
|
||
id=str(_id),
|
||
name=name,
|
||
gacha_type=uigf_gacha_type,
|
||
item_type=item_type,
|
||
rank_type=str(rank_type),
|
||
time=date_string,
|
||
uigf_gacha_type=uigf_gacha_type,
|
||
)
|
||
|
||
wb = load_workbook(file)
|
||
wb_len = len(wb.worksheets)
|
||
|
||
if wb_len == 6:
|
||
import_type = ImportType.PAIMONMOE
|
||
elif wb_len == 5:
|
||
import_type = ImportType.UIGF
|
||
elif wb_len == 4:
|
||
import_type = ImportType.FXQ
|
||
else:
|
||
raise GachaLogFileError("xlsx 格式错误")
|
||
|
||
paimonmoe_sheets = {
|
||
UIGFGachaType.BEGINNER: "Beginners' Wish",
|
||
UIGFGachaType.STANDARD: "Standard",
|
||
UIGFGachaType.CHARACTER: "Character Event",
|
||
UIGFGachaType.WEAPON: "Weapon Event",
|
||
}
|
||
fxq_sheets = {
|
||
UIGFGachaType.BEGINNER: "新手祈愿",
|
||
UIGFGachaType.STANDARD: "常驻祈愿",
|
||
UIGFGachaType.CHARACTER: "角色活动祈愿",
|
||
UIGFGachaType.WEAPON: "武器活动祈愿",
|
||
}
|
||
data = UIGFModel(info=UIGFInfo(export_app=import_type.value), list=[])
|
||
if import_type == ImportType.PAIMONMOE:
|
||
ws = wb["Information"]
|
||
if ws["B2"].value != PAIMONMOE_VERSION:
|
||
raise PaimonMoeGachaLogFileError(file_version=ws["B2"].value, support_version=PAIMONMOE_VERSION)
|
||
count = 1
|
||
for gacha_type in paimonmoe_sheets:
|
||
ws = wb[paimonmoe_sheets[gacha_type]]
|
||
for row in ws.iter_rows(min_row=2, values_only=True):
|
||
if row[0] is None:
|
||
break
|
||
data.list.append(from_paimon_moe(gacha_type, row[0], row[1], row[2], row[3], count))
|
||
count += 1
|
||
elif import_type == ImportType.UIGF:
|
||
ws = wb["原始数据"]
|
||
type_map = {}
|
||
count = 0
|
||
for row in ws["1"]:
|
||
if row.value is None:
|
||
break
|
||
type_map[row.value] = count
|
||
count += 1
|
||
for row in ws.iter_rows(min_row=2, values_only=True):
|
||
if row[0] is None:
|
||
break
|
||
data.list.append(
|
||
from_uigf(
|
||
row[type_map["uigf_gacha_type"]],
|
||
row[type_map["gacha_type"]],
|
||
row[type_map["item_type"]],
|
||
row[type_map["name"]],
|
||
row[type_map["time"]],
|
||
row[type_map["rank_type"]],
|
||
row[type_map["id"]],
|
||
)
|
||
)
|
||
else:
|
||
for gacha_type in fxq_sheets:
|
||
ws = wb[fxq_sheets[gacha_type]]
|
||
for row in ws.iter_rows(min_row=2, values_only=True):
|
||
if row[0] is None:
|
||
break
|
||
data.list.append(from_fxq(gacha_type, row[2], row[1], row[0], row[3], row[6]))
|
||
|
||
return json.loads(data.json())
|