MibooGram/modules/gacha_log/log.py

676 lines
28 KiB
Python
Raw Normal View History

import asyncio
2022-10-08 08:50:02 +00:00
import contextlib
import datetime
import json
from concurrent.futures import ThreadPoolExecutor
2022-10-08 08:50:02 +00:00
from pathlib import Path
from typing import Dict, List, Optional, Tuple, TYPE_CHECKING
2022-10-08 08:50:02 +00:00
import aiofiles
from simnet import ZZZClient, Region
from simnet.errors import AuthkeyTimeout, InvalidAuthkey
from simnet.models.zzz.wish import ZZZBannerType
from simnet.utils.player import recognize_zzz_server
2022-10-08 08:50:02 +00:00
from metadata.pool.pool import get_pool_by_id
from modules.gacha_log.const import GACHA_TYPE_LIST
from modules.gacha_log.error import (
GachaLogAccountNotFound,
GachaLogAuthkeyTimeout,
GachaLogException,
GachaLogFileError,
GachaLogInvalidAuthkey,
GachaLogMixedProvider,
GachaLogNotFound,
)
from modules.gacha_log.models import (
FiveStarItem,
FourStarItem,
GachaItem,
GachaLogInfo,
ImportType,
Pool,
ZZZGFInfo,
ZZZGFItem,
ZZZGFModel,
)
from utils.const import PROJECT_ROOT
from utils.uid import mask_number
if TYPE_CHECKING:
from core.dependence.assets import AssetsService
GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "signal_log")
GACHA_LOG_PATH.mkdir(parents=True, exist_ok=True)
2022-10-08 08:50:02 +00:00
class GachaLog:
def __init__(self, gacha_log_path: Path = GACHA_LOG_PATH):
self.gacha_log_path = gacha_log_path
2022-10-08 08:50:02 +00:00
@staticmethod
async def load_json(path):
2022-10-09 03:18:49 +00:00
async with aiofiles.open(path, "r", encoding="utf-8") as f:
2022-10-08 08:50:02 +00:00
return json.loads(await f.read())
@staticmethod
async def save_json(path, data):
2022-10-09 03:18:49 +00:00
async with aiofiles.open(path, "w", encoding="utf-8") as f:
2022-10-08 08:50:02 +00:00
if isinstance(data, dict):
return await f.write(json.dumps(data, ensure_ascii=False, indent=4))
await f.write(data)
2022-10-09 03:18:49 +00:00
async def load_history_info(
self, user_id: str, uid: str, only_status: bool = False
2022-10-09 03:18:49 +00:00
) -> Tuple[Optional[GachaLogInfo], bool]:
"""读取历史调频记录数据
2022-10-08 08:50:02 +00:00
:param user_id: 用户id
:param uid: 原神uid
2022-10-08 15:40:15 +00:00
:param only_status: 是否只读取状态
:return: 调频记录数据
2022-10-08 08:50:02 +00:00
"""
file_path = self.gacha_log_path / f"{user_id}-{uid}.json"
2022-10-08 15:40:15 +00:00
if only_status:
return None, file_path.exists()
if not file_path.exists():
2022-10-09 03:18:49 +00:00
return GachaLogInfo(user_id=user_id, uid=uid, update_time=datetime.datetime.now()), False
2022-10-08 15:40:15 +00:00
try:
return GachaLogInfo.parse_obj(await self.load_json(file_path)), True
2022-10-08 15:40:15 +00:00
except json.decoder.JSONDecodeError:
return GachaLogInfo(user_id=user_id, uid=uid, update_time=datetime.datetime.now()), False
async def remove_history_info(self, user_id: str, uid: str) -> bool:
"""删除历史调频记录数据
2022-10-08 15:40:15 +00:00
:param user_id: 用户id
:param uid: 原神uid
:return: 是否删除成功
"""
file_path = self.gacha_log_path / f"{user_id}-{uid}.json"
file_bak_path = self.gacha_log_path / f"{user_id}-{uid}.json.bak"
file_export_path = self.gacha_log_path / f"{user_id}-{uid}-uigf.json"
2022-10-08 15:40:15 +00:00
with contextlib.suppress(Exception):
file_bak_path.unlink(missing_ok=True)
with contextlib.suppress(Exception):
file_export_path.unlink(missing_ok=True)
if file_path.exists():
try:
file_path.unlink()
except PermissionError:
return False
return True
return False
2022-10-08 08:50:02 +00:00
2023-12-16 10:01:27 +00:00
async def move_history_info(self, user_id: str, uid: str, new_user_id: str) -> bool:
"""移动历史抽卡记录数据
:param user_id: 用户id
:param uid: 原神uid
:param new_user_id: 新用户id
:return: 是否移动成功
"""
old_file_path = self.gacha_log_path / f"{user_id}-{uid}.json"
new_file_path = self.gacha_log_path / f"{new_user_id}-{uid}.json"
if (not old_file_path.exists()) or new_file_path.exists():
return False
try:
old_file_path.rename(new_file_path)
return True
except PermissionError:
return False
async def save_gacha_log_info(self, user_id: str, uid: str, info: GachaLogInfo):
"""保存调频记录数据
2022-10-08 08:50:02 +00:00
:param user_id: 用户id
:param uid: 玩家uid
:param info: 调频记录数据
2022-10-08 08:50:02 +00:00
"""
save_path = self.gacha_log_path / f"{user_id}-{uid}.json"
save_path_bak = self.gacha_log_path / f"{user_id}-{uid}.json.bak"
2022-10-08 08:50:02 +00:00
# 将旧数据备份一次
with contextlib.suppress(PermissionError):
if save_path.exists():
if save_path_bak.exists():
save_path_bak.unlink()
2022-10-09 03:18:49 +00:00
save_path.rename(save_path.parent / f"{save_path.name}.bak")
2022-10-08 08:50:02 +00:00
# 写入数据
await self.save_json(save_path, info.json())
2022-10-08 08:50:02 +00:00
async def gacha_log_to_zzzgf(self, user_id: str, uid: str) -> Optional[Path]:
"""调频日记转换为 ZZZGF 格式
2022-10-08 08:50:02 +00:00
:param user_id: 用户ID
:param uid: 游戏UID
:return: 转换是否成功转换信息ZZZGF 文件目录
2022-10-08 08:50:02 +00:00
"""
data, state = await self.load_history_info(user_id, uid)
2022-10-08 08:50:02 +00:00
if not state:
raise GachaLogNotFound
save_path = self.gacha_log_path / f"{user_id}-{uid}-zzzgf.json"
info = ZZZGFModel(
info=ZZZGFInfo(uid=uid, export_app=ImportType.PaiGram.value, export_app_version="v4"), list=[]
)
2022-10-08 08:50:02 +00:00
for items in data.item_list.values():
for item in items:
info.list.append(
ZZZGFItem(
id=item.id,
name=item.name,
gacha_id=item.gacha_id,
gacha_type=item.gacha_type,
item_id=item.item_id,
item_type=item.item_type,
rank_type=item.rank_type,
time=item.time.strftime("%Y-%m-%d %H:%M:%S"),
)
2022-10-09 03:18:49 +00:00
)
2022-10-29 07:40:40 +00:00
await self.save_json(save_path, json.loads(info.json()))
return save_path
2022-10-08 08:50:02 +00:00
2022-10-08 15:40:15 +00:00
@staticmethod
async def verify_data(data: List[GachaItem]) -> bool:
2022-10-08 15:40:15 +00:00
try:
total = len(data)
five_star = len([i for i in data if i.rank_type == "5"])
four_star = len([i for i in data if i.rank_type == "4"])
if total > 50:
if total <= five_star * 15:
raise GachaLogFileError(
"检测到您将要导入的调频记录中五星数量过多,可能是由于文件错误导致的,请检查后重新导入。"
)
2022-10-08 15:40:15 +00:00
if four_star < five_star:
raise GachaLogFileError(
"检测到您将要导入的调频记录中五星数量过多,可能是由于文件错误导致的,请检查后重新导入。"
)
return True
2022-10-11 06:45:07 +00:00
except Exception as exc: # pylint: disable=W0703
raise GachaLogFileError from exc
2022-10-08 15:40:15 +00:00
@staticmethod
def import_data_backend(all_items: List[GachaItem], gacha_log: GachaLogInfo, temp_id_data: Dict) -> int:
new_num = 0
for item_info in all_items:
pool_name = GACHA_TYPE_LIST[ZZZBannerType(int(item_info.gacha_type))]
if pool_name not in temp_id_data:
temp_id_data[pool_name] = []
if pool_name not in gacha_log.item_list:
gacha_log.item_list[pool_name] = []
if item_info.id not in temp_id_data[pool_name]:
gacha_log.item_list[pool_name].append(item_info)
temp_id_data[pool_name].append(item_info.id)
new_num += 1
return new_num
async def import_gacha_log_data(self, user_id: int, player_id: int, data: dict, verify_uid: bool = True) -> int:
2022-10-08 08:50:02 +00:00
new_num = 0
try:
uid = data["info"]["uid"]
if not verify_uid:
uid = player_id
elif int(uid) != player_id:
raise GachaLogAccountNotFound
try:
import_type = ImportType(data["info"]["export_app"])
except ValueError:
import_type = ImportType.UNKNOWN
2022-10-08 15:40:15 +00:00
# 检查导入数据是否合法
all_items = [GachaItem(**i) for i in data["list"]]
await self.verify_data(all_items)
gacha_log, status = await self.load_history_info(str(user_id), uid)
# 将唯一 id 放入临时数据中,加快查找速度
temp_id_data = {
pool_name: [i.id for i in pool_data] for pool_name, pool_data in gacha_log.item_list.items()
}
# 使用新线程进行遍历,避免堵塞主线程
loop = asyncio.get_event_loop()
# 可以使用with语句来确保线程执行完成后及时被清理
with ThreadPoolExecutor() as executor:
new_num = await loop.run_in_executor(
executor, self.import_data_backend, all_items, gacha_log, temp_id_data
)
2022-10-08 08:50:02 +00:00
for i in gacha_log.item_list.values():
2022-10-08 15:40:15 +00:00
# 检查导入后的数据是否合法
await self.verify_data(i)
2022-10-08 08:50:02 +00:00
i.sort(key=lambda x: (x.time, x.id))
gacha_log.update_time = datetime.datetime.now()
gacha_log.import_type = import_type.value
await self.save_gacha_log_info(str(user_id), uid, gacha_log)
return new_num
except GachaLogAccountNotFound as e:
raise GachaLogAccountNotFound("导入失败,文件包含的调频记录所属 uid 与你当前绑定的 uid 不同") from e
except GachaLogMixedProvider as e:
raise GachaLogMixedProvider from e
except Exception as exc:
raise GachaLogException from exc
2022-10-08 08:50:02 +00:00
2023-07-19 03:41:40 +00:00
@staticmethod
def get_game_client(player_id: int) -> ZZZClient:
if recognize_zzz_server(player_id) in ["prod_gf_cn"]:
return ZZZClient(player_id=player_id, region=Region.CHINESE, lang="zh-cn")
else:
return ZZZClient(player_id=player_id, region=Region.OVERSEAS, lang="zh-cn")
2023-07-19 03:41:40 +00:00
async def get_gacha_log_data(self, user_id: int, player_id: int, authkey: str) -> int:
"""使用authkey获取调频记录数据并合并旧数据
2022-10-08 08:50:02 +00:00
:param user_id: 用户id
2023-07-19 03:41:40 +00:00
:param player_id: 玩家id
2022-10-08 08:50:02 +00:00
:param authkey: authkey
:return: 更新结果
"""
new_num = 0
2023-07-19 03:41:40 +00:00
gacha_log, _ = await self.load_history_info(str(user_id), str(player_id))
# 将唯一 id 放入临时数据中,加快查找速度
temp_id_data = {pool_name: {i.id: i for i in pool_data} for pool_name, pool_data in gacha_log.item_list.items()}
2023-07-19 03:41:40 +00:00
client = self.get_game_client(player_id)
2022-10-08 08:50:02 +00:00
try:
for pool_id, pool_name in GACHA_TYPE_LIST.items():
wish_history = await client.wish_history(pool_id.value, authkey=authkey)
for data in wish_history:
2022-10-08 08:50:02 +00:00
item = GachaItem(
id=str(data.id),
name=data.name,
gacha_id=str(data.banner_id),
2022-10-08 08:50:02 +00:00
gacha_type=str(data.banner_type.value),
item_id=str(data.item_id),
2022-10-08 08:50:02 +00:00
item_type=data.type,
rank_type=str(data.rarity),
2022-10-09 03:18:49 +00:00
time=datetime.datetime(
data.time.year,
data.time.month,
data.time.day,
data.time.hour,
data.time.minute,
data.time.second,
),
2022-10-08 08:50:02 +00:00
)
if pool_name not in temp_id_data:
temp_id_data[pool_name] = {}
if pool_name not in gacha_log.item_list:
gacha_log.item_list[pool_name] = []
if item.id not in temp_id_data[pool_name].keys():
2022-10-08 08:50:02 +00:00
gacha_log.item_list[pool_name].append(item)
temp_id_data[pool_name][item.id] = item
2022-10-08 08:50:02 +00:00
new_num += 1
else:
old_item: GachaItem = temp_id_data[pool_name][item.id]
old_item.gacha_id = item.gacha_id
old_item.item_id = item.item_id
except AuthkeyTimeout as exc:
raise GachaLogAuthkeyTimeout from exc
except InvalidAuthkey as exc:
raise GachaLogInvalidAuthkey from exc
2023-07-19 03:41:40 +00:00
finally:
await client.shutdown()
2022-10-08 08:50:02 +00:00
for i in gacha_log.item_list.values():
i.sort(key=lambda x: (x.time, x.id))
gacha_log.update_time = datetime.datetime.now()
gacha_log.import_type = ImportType.PaiGram.value
2023-07-19 03:41:40 +00:00
await self.save_gacha_log_info(str(user_id), str(player_id), gacha_log)
return new_num
2022-10-08 08:50:02 +00:00
@staticmethod
def check_avatar_up(name: str, gacha_time: datetime.datetime) -> bool:
if name in {"莱卡恩", "猫又", "格莉丝", "丽娜", "「11号」", "珂蕾妲"}:
2022-10-08 08:50:02 +00:00
return False
return True
async def get_all_5_star_items(self, data: List[GachaItem], assets: "AssetsService", pool_name: str = "代理人调频"):
2022-10-08 08:50:02 +00:00
"""
获取所有5星代理人
:param data: 调频记录
2022-10-08 08:50:02 +00:00
:param assets: 资源服务
:param pool_name: 池子名称
:return: 5星代理人列表
2022-10-08 08:50:02 +00:00
"""
count = 0
result = []
for item in data:
count += 1
2022-10-09 03:18:49 +00:00
if item.rank_type == "5":
if item.item_type == "代理人" and pool_name in {"代理人调频", "常驻调频"}:
if pool_name == "代理人调频":
isUp, isBig = (
self.check_avatar_up(item.name, item.time),
(not result[-1].isUp) if result else False,
)
else:
isUp, isBig = False, False
2022-10-10 03:37:58 +00:00
data = {
"name": item.name,
"icon": assets.avatar.normal(item.name).as_uri(),
2022-10-10 03:37:58 +00:00
"count": count,
"type": "代理人",
"isUp": isUp,
"isBig": isBig,
2022-10-10 03:37:58 +00:00
"time": item.time,
}
result.append(FiveStarItem.construct(**data))
elif item.item_type == "音擎" and pool_name in {"音擎调频", "常驻调频"}:
2022-10-10 03:37:58 +00:00
data = {
"name": item.name,
"icon": assets.weapon.icon(item.name).as_uri(),
2022-10-10 03:37:58 +00:00
"count": count,
"type": "音擎",
"isUp": False,
"isBig": False,
"time": item.time,
}
result.append(FiveStarItem.construct(**data))
elif item.item_type == "邦布" and pool_name in {"邦布调频"}:
data = {
"name": item.name,
"icon": assets.buddy.icon(item.name).as_uri(),
"count": count,
"type": "邦布",
2022-10-10 03:37:58 +00:00
"isUp": False,
"isBig": False,
"time": item.time,
}
result.append(FiveStarItem.construct(**data))
2022-10-08 08:50:02 +00:00
count = 0
result.reverse()
return result, count
@staticmethod
async def get_all_4_star_items(data: List[GachaItem], assets: "AssetsService"):
2022-10-08 08:50:02 +00:00
"""
获取 no_fout_star
:param data: 调频记录
2022-10-08 08:50:02 +00:00
:param assets: 资源服务
:return: no_fout_star
"""
count = 0
result = []
for item in data:
count += 1
2022-10-09 03:18:49 +00:00
if item.rank_type == "4":
if item.item_type == "代理人":
data = {
"name": item.name,
"icon": assets.avatar.normal(item.name).as_uri(),
"count": count,
"type": "代理人",
"time": item.time,
}
result.append(FourStarItem.construct(**data))
elif item.item_type == "音擎":
2022-10-10 03:37:58 +00:00
data = {
"name": item.name,
"icon": assets.weapon.icon(item.name).as_uri(),
2022-10-10 03:37:58 +00:00
"count": count,
"type": "音擎",
2022-10-10 03:37:58 +00:00
"time": item.time,
}
result.append(FourStarItem.construct(**data))
elif item.item_type == "邦布":
2022-10-10 03:37:58 +00:00
data = {
"name": item.name,
"icon": assets.buddy.icon(item.name).as_uri(),
2022-10-10 03:37:58 +00:00
"count": count,
"type": "邦布",
2022-10-10 03:37:58 +00:00
"time": item.time,
}
result.append(FourStarItem.construct(**data))
2022-10-08 08:50:02 +00:00
count = 0
result.reverse()
return result, count
@staticmethod
def get_2_pool_data(total: int, all_five: List[FiveStarItem], no_five_star: int, no_four_star: int):
2022-10-08 08:50:02 +00:00
# 总共五星
five_star = len(all_five)
five_star_up = len([i for i in all_five if i.isUp])
five_star_big = len([i for i in all_five if i.isBig])
# 五星平均
five_star_avg = round((total - no_five_star) / five_star, 2) if five_star != 0 else 0
2022-10-08 08:50:02 +00:00
# 小保底不歪
2022-10-09 03:18:49 +00:00
small_protect = (
round((five_star_up - five_star_big) / (five_star - five_star_big) * 100.0, 1)
if five_star - five_star_big != 0
else "0.0"
)
2022-10-08 08:50:02 +00:00
# 五星常驻
five_star_const = five_star - five_star_up
# UP 平均
up_avg = (
round((total - no_five_star - (all_five[0].count if not all_five[0].isUp else 0)) / five_star_up, 2)
if five_star_up != 0
else 0
)
2022-10-08 08:50:02 +00:00
# UP 花费原石
up_cost = sum(i.count * 160 for i in all_five if i.isUp)
up_cost = f"{round(up_cost / 10000, 2)}w" if up_cost >= 10000 else up_cost
return [
[
{"num": no_five_star, "unit": "", "lable": "未出五星"},
{"num": five_star, "unit": "", "lable": "五星"},
{"num": five_star_avg, "unit": "", "lable": "五星平均"},
{"num": small_protect, "unit": "%", "lable": "小保底不歪"},
{"num": no_four_star, "unit": "", "lable": "未出四星"},
{"num": five_star_const, "unit": "", "lable": "五星常驻"},
{"num": up_avg, "unit": "", "lable": "UP平均"},
2024-07-15 08:32:25 +00:00
{"num": up_cost, "unit": "", "lable": "UP花费菲林"},
2022-10-09 03:18:49 +00:00
],
2022-10-08 08:50:02 +00:00
]
@staticmethod
def get_1_pool_data(
total: int, all_five: List[FiveStarItem], all_four: List[FourStarItem], no_five_star: int, no_four_star: int
2022-10-09 03:18:49 +00:00
):
2022-10-08 08:50:02 +00:00
# 总共五星
five_star = len(all_five)
# 五星平均
five_star_avg = round((total - no_five_star) / five_star, 2) if five_star != 0 else 0
# 五星音擎
five_star_weapon = len([i for i in all_five if i.type == "音擎"])
2022-10-08 08:50:02 +00:00
# 总共四星
four_star = len(all_four)
# 四星平均
four_star_avg = round((total - no_four_star) / four_star, 2) if four_star != 0 else 0
2022-10-08 08:50:02 +00:00
# 四星最多
four_star_name_list = [i.name for i in all_four]
four_star_max = max(four_star_name_list, key=four_star_name_list.count) if four_star_name_list else ""
2022-10-08 08:50:02 +00:00
four_star_max_count = four_star_name_list.count(four_star_max)
return [
[
{"num": no_five_star, "unit": "", "lable": "未出五星"},
{"num": five_star, "unit": "", "lable": "五星"},
{"num": five_star_avg, "unit": "", "lable": "五星平均"},
{"num": five_star_weapon, "unit": "", "lable": "五星音擎"},
2022-10-08 08:50:02 +00:00
{"num": no_four_star, "unit": "", "lable": "未出四星"},
{"num": four_star, "unit": "", "lable": "四星"},
{"num": four_star_avg, "unit": "", "lable": "四星平均"},
{"num": four_star_max_count, "unit": four_star_max, "lable": "四星最多"},
2022-10-09 03:18:49 +00:00
],
2022-10-08 08:50:02 +00:00
]
@staticmethod
def get_3_pool_data(
total: int, all_five: List[FiveStarItem], all_four: List[FourStarItem], no_five_star: int, no_four_star: int
2022-10-09 03:18:49 +00:00
):
2022-10-08 08:50:02 +00:00
# 总共五星
five_star = len(all_five)
# 五星平均
five_star_avg = round((total - no_five_star) / five_star, 2) if five_star != 0 else 0
# 四星音擎
four_star_weapon = len([i for i in all_four if i.type == "音擎"])
2022-10-08 08:50:02 +00:00
# 总共四星
four_star = len(all_four)
# 四星平均
four_star_avg = round((total - no_four_star) / four_star, 2) if four_star != 0 else 0
2022-10-08 08:50:02 +00:00
# 四星最多
four_star_name_list = [i.name for i in all_four]
four_star_max = max(four_star_name_list, key=four_star_name_list.count) if four_star_name_list else ""
2022-10-08 08:50:02 +00:00
four_star_max_count = four_star_name_list.count(four_star_max)
return [
[
{"num": no_five_star, "unit": "", "lable": "未出五星"},
{"num": five_star, "unit": "", "lable": "五星"},
{"num": five_star_avg, "unit": "", "lable": "五星平均"},
{"num": four_star_weapon, "unit": "", "lable": "四星"},
{"num": no_four_star, "unit": "", "lable": "未出四星"},
{"num": four_star, "unit": "", "lable": "四星"},
{"num": four_star_avg, "unit": "", "lable": "四星平均"},
{"num": four_star_max_count, "unit": four_star_max, "lable": "四星最多"},
],
]
@staticmethod
def count_fortune(pool_name: str, summon_data, weapon: bool = False):
"""
代理人 音擎
50以下 45以下
50-60 45-55
60-70 55-65
70以上 65以上
"""
data = [45, 55, 65] if weapon else [50, 60, 70]
for i in summon_data:
for j in i:
if j.get("lable") == "五星平均":
num = j.get("num", 0)
if num == 0:
return pool_name
if num <= data[0]:
return f"{pool_name} · 欧"
if num <= data[1]:
return f"{pool_name} · 吉"
if num <= data[2]:
return f"{pool_name} · 普通"
return f"{pool_name} · 非"
return pool_name
async def get_analysis(self, user_id: int, player_id: int, pool: ZZZBannerType, assets: "AssetsService"):
2022-10-08 08:50:02 +00:00
"""
获取调频记录分析数据
2022-10-08 08:50:02 +00:00
:param user_id: 用户id
:param player_id: 玩家id
2022-10-08 08:50:02 +00:00
:param pool: 池子类型
:param assets: 资源服务
:return: 分析数据
"""
gacha_log, status = await self.load_history_info(str(user_id), str(player_id))
2022-10-08 08:50:02 +00:00
if not status:
raise GachaLogNotFound
2022-10-08 08:50:02 +00:00
pool_name = GACHA_TYPE_LIST[pool]
if pool_name not in gacha_log.item_list:
raise GachaLogNotFound
2022-10-08 08:50:02 +00:00
data = gacha_log.item_list[pool_name]
total = len(data)
if total == 0:
raise GachaLogNotFound
all_five, no_five_star = await self.get_all_5_star_items(data, assets, pool_name)
all_four, no_four_star = await self.get_all_4_star_items(data, assets)
2022-10-08 08:50:02 +00:00
summon_data = None
if pool == ZZZBannerType.CHARACTER:
summon_data = self.get_2_pool_data(total, all_five, no_five_star, no_four_star)
pool_name = self.count_fortune(pool_name, summon_data)
elif pool in [ZZZBannerType.WEAPON, ZZZBannerType.BANGBOO]:
summon_data = self.get_3_pool_data(total, all_five, all_four, no_five_star, no_four_star)
pool_name = self.count_fortune(pool_name, summon_data, True)
elif pool == ZZZBannerType.STANDARD:
summon_data = self.get_1_pool_data(total, all_five, all_four, no_five_star, no_four_star)
pool_name = self.count_fortune(pool_name, summon_data)
2022-10-08 08:50:02 +00:00
last_time = data[0].time.strftime("%Y-%m-%d %H:%M")
first_time = data[-1].time.strftime("%Y-%m-%d %H:%M")
return {
"uid": mask_number(player_id),
2022-10-08 08:50:02 +00:00
"allNum": total,
"type": pool.value,
"typeName": pool_name,
"line": summon_data,
"firstTime": first_time,
"lastTime": last_time,
"fiveLog": all_five,
2023-08-29 07:20:02 +00:00
"fourLog": all_four[:36],
2022-10-08 08:50:02 +00:00
}
async def get_pool_analysis(
self, user_id: int, player_id: int, pool: ZZZBannerType, assets: "AssetsService", group: bool
) -> dict:
"""获取调频记录分析数据
2022-10-08 08:50:02 +00:00
:param user_id: 用户id
:param player_id: 玩家id
2022-10-08 08:50:02 +00:00
:param pool: 池子类型
:param assets: 资源服务
:param group: 是否群组
:return: 分析数据
"""
gacha_log, status = await self.load_history_info(str(user_id), str(player_id))
2022-10-08 08:50:02 +00:00
if not status:
raise GachaLogNotFound
2022-10-08 08:50:02 +00:00
pool_name = GACHA_TYPE_LIST[pool]
if pool_name not in gacha_log.item_list:
raise GachaLogNotFound
2022-10-08 08:50:02 +00:00
data = gacha_log.item_list[pool_name]
total = len(data)
if total == 0:
raise GachaLogNotFound
all_five, _ = await self.get_all_5_star_items(data, assets, pool_name)
all_four, _ = await self.get_all_4_star_items(data, assets)
2022-10-08 08:50:02 +00:00
pool_data = []
up_pool_data = [Pool(**i) for i in get_pool_by_id(pool.value)]
for up_pool in up_pool_data:
for item in all_five:
up_pool.parse(item)
for item in all_four:
up_pool.parse(item)
up_pool.count_item(data)
for up_pool in up_pool_data:
2022-10-09 03:18:49 +00:00
pool_data.append(
{
"count": up_pool.count,
"list": up_pool.to_list(),
"name": up_pool.name,
"start": up_pool.start.strftime("%Y-%m-%d"),
"end": up_pool.end.strftime("%Y-%m-%d"),
}
)
2022-10-08 08:50:02 +00:00
pool_data = [i for i in pool_data if i["count"] > 0]
return {
"uid": mask_number(player_id),
2022-10-08 08:50:02 +00:00
"typeName": pool_name,
"pool": pool_data[:6] if group else pool_data,
"hasMore": len(pool_data) > 6,
}
async def get_all_five_analysis(self, user_id: int, player_id: int, assets: "AssetsService") -> dict:
"""获取五星调频记录分析数据
:param user_id: 用户id
:param player_id: 玩家id
:param assets: 资源服务
:return: 分析数据
"""
gacha_log, status = await self.load_history_info(str(user_id), str(player_id))
if not status:
raise GachaLogNotFound
pools = []
for pool_name, items in gacha_log.item_list.items():
pool = Pool(
five=[pool_name],
four=[],
name=pool_name,
to=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
**{"from": "2020-09-28 00:00:00"},
)
all_five, _ = await self.get_all_5_star_items(items, assets, pool_name)
for item in all_five:
pool.parse(item)
pool.count_item(items)
pools.append(pool)
pool_data = [
{
"count": up_pool.count,
"list": up_pool.to_list(),
"name": up_pool.name,
"start": up_pool.start.strftime("%Y-%m-%d"),
"end": up_pool.end.strftime("%Y-%m-%d"),
}
for up_pool in pools
]
return {
"uid": mask_number(player_id),
"typeName": "五星列表",
"pool": pool_data,
"hasMore": False,
}