🐛 修复抽卡记录导入效验

This commit is contained in:
xtaodada 2022-10-09 11:18:49 +08:00
parent 2c944395cb
commit a3a33241c2
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659

View File

@ -18,11 +18,11 @@ from utils.const import PROJECT_ROOT
GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "gacha_log")
GACHA_LOG_PATH.mkdir(parents=True, exist_ok=True)
GACHA_TYPE_LIST = {
BannerType.NOVICE: '新手祈愿',
BannerType.PERMANENT: '常驻祈愿',
BannerType.WEAPON: '武器祈愿',
BannerType.CHARACTER1: '角色祈愿',
BannerType.CHARACTER2: '角色祈愿'
BannerType.NOVICE: "新手祈愿",
BannerType.PERMANENT: "常驻祈愿",
BannerType.WEAPON: "武器祈愿",
BannerType.CHARACTER1: "角色祈愿",
BannerType.CHARACTER2: "角色祈愿",
}
@ -52,28 +52,28 @@ class GachaItem(BaseModel):
rank_type: str
time: datetime.datetime
@validator('name')
@validator("name")
def name_validator(cls, v):
if (roleToId(v) or weaponToId(v)) and not not_real_roles(v):
if (roleToId(v) or weaponToId(v)) and v not in not_real_roles:
return v
raise ValueError('Invalid name')
raise ValueError("Invalid name")
@validator('gacha_type')
@validator("gacha_type")
def check_gacha_type(cls, v):
if v not in {"200", "301", "302", "400"}:
raise ValueError("gacha_type must be 200, 301, 302 or 400")
return v
@validator('item_type')
@validator("item_type")
def check_item_type(cls, item):
if item not in {'角色', '武器'}:
raise ValueError('error item type')
if item not in {"角色", "武器"}:
raise ValueError("error item type")
return item
@validator('rank_type')
@validator("rank_type")
def check_rank_type(cls, rank):
if rank not in {'5', '4', '3'}:
raise ValueError('error rank type')
if rank not in {"5", "4", "3"}:
raise ValueError("error rank type")
return rank
@ -82,10 +82,10 @@ class GachaLogInfo(BaseModel):
uid: str
update_time: datetime.datetime
item_list: Dict[str, List[GachaItem]] = {
'角色祈愿': [],
'武器祈愿': [],
'常驻祈愿': [],
'新手祈愿': [],
"角色祈愿": [],
"武器祈愿": [],
"常驻祈愿": [],
"新手祈愿": [],
}
@ -132,33 +132,31 @@ class Pool:
class GachaLog:
@staticmethod
async def load_json(path):
async with aiofiles.open(path, 'r', encoding='utf-8') as f:
async with aiofiles.open(path, "r", encoding="utf-8") as f:
return json.loads(await f.read())
@staticmethod
async def save_json(path, data):
async with aiofiles.open(path, 'w', encoding='utf-8') as f:
async with aiofiles.open(path, "w", encoding="utf-8") as f:
if isinstance(data, dict):
return await f.write(json.dumps(data, ensure_ascii=False, indent=4))
await f.write(data)
@staticmethod
async def load_history_info(user_id: str, uid: str, only_status: bool = False) -> Tuple[Optional[GachaLogInfo], bool]:
async def load_history_info(
user_id: str, uid: str, only_status: bool = False
) -> Tuple[Optional[GachaLogInfo], bool]:
"""读取历史抽卡记录数据
:param user_id: 用户id
:param uid: 原神uid
:param only_status: 是否只读取状态
:return: 抽卡记录数据
"""
file_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json'
file_path = GACHA_LOG_PATH / f"{user_id}-{uid}.json"
if only_status:
return None, file_path.exists()
if not file_path.exists():
return GachaLogInfo(
user_id=user_id,
uid=uid,
update_time=datetime.datetime.now()
), False
return GachaLogInfo(user_id=user_id, uid=uid, update_time=datetime.datetime.now()), False
try:
return GachaLogInfo.parse_obj(await GachaLog.load_json(file_path)), True
except json.decoder.JSONDecodeError:
@ -171,9 +169,9 @@ class GachaLog:
:param uid: 原神uid
:return: 是否删除成功
"""
file_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json'
file_bak_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json.bak'
file_export_path = GACHA_LOG_PATH / f'{user_id}-{uid}-uigf.json'
file_path = GACHA_LOG_PATH / f"{user_id}-{uid}.json"
file_bak_path = GACHA_LOG_PATH / f"{user_id}-{uid}.json.bak"
file_export_path = GACHA_LOG_PATH / f"{user_id}-{uid}-uigf.json"
with contextlib.suppress(Exception):
file_bak_path.unlink(missing_ok=True)
with contextlib.suppress(Exception):
@ -193,14 +191,14 @@ class GachaLog:
:param uid: 原神uid
:param info: 抽卡记录数据
"""
save_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json'
save_path_bak = GACHA_LOG_PATH / f'{user_id}-{uid}.json.bak'
save_path = GACHA_LOG_PATH / f"{user_id}-{uid}.json"
save_path_bak = GACHA_LOG_PATH / f"{user_id}-{uid}.json.bak"
# 将旧数据备份一次
with contextlib.suppress(PermissionError):
if save_path.exists():
if save_path_bak.exists():
save_path_bak.unlink()
save_path.rename(save_path.parent / f'{save_path.name}.bak')
save_path.rename(save_path.parent / f"{save_path.name}.bak")
# 写入数据
await GachaLog.save_json(save_path, info.json())
@ -213,35 +211,37 @@ class GachaLog:
"""
data, state = await GachaLog.load_history_info(user_id, uid)
if not state:
return False, '派蒙还没有找到你导入的任何抽卡记录哦,快试试导入吧~', None
save_path = GACHA_LOG_PATH / f'{user_id}-{uid}-uigf.json'
return False, "派蒙还没有找到你导入的任何抽卡记录哦,快试试导入吧~", None
save_path = GACHA_LOG_PATH / f"{user_id}-{uid}-uigf.json"
uigf_dict = {
'info': {
'uid': uid,
'lang': 'zh-cn',
'export_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'export_timestamp': int(time.time()),
'export_app': 'TGPaimonBot',
'export_app_version': "v3",
'uigf_version': 'v2.2'
"info": {
"uid": uid,
"lang": "zh-cn",
"export_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"export_timestamp": int(time.time()),
"export_app": "TGPaimonBot",
"export_app_version": "v3",
"uigf_version": "v2.2",
},
'list': []
"list": [],
}
for items in data.item_list.values():
for item in items:
uigf_dict['list'].append({
'gacha_type': item.gacha_type,
'item_id': '',
'count': '1',
'time': item.time.strftime('%Y-%m-%d %H:%M:%S'),
'name': item.name,
'item_type': item.item_type,
'rank_type': item.rank_type,
'id': item.id,
'uigf_gacha_type': item.gacha_type
})
uigf_dict["list"].append(
{
"gacha_type": item.gacha_type,
"item_id": "",
"count": "1",
"time": item.time.strftime("%Y-%m-%d %H:%M:%S"),
"name": item.name,
"item_type": item.item_type,
"rank_type": item.rank_type,
"id": item.id,
"uigf_gacha_type": item.gacha_type,
}
)
await GachaLog.save_json(save_path, uigf_dict)
return True, '', save_path
return True, "", save_path
@staticmethod
async def verify_data(data: List[GachaItem]):
@ -263,14 +263,14 @@ class GachaLog:
new_num = 0
try:
# 检查导入数据是否合法
status, text = await GachaLog.verify_data([GachaItem(**i) for i in data['list']])
status, text = await GachaLog.verify_data([GachaItem(**i) for i in data["list"]])
if not status:
return text
uid = data['info']['uid']
uid = data["info"]["uid"]
int(uid)
gacha_log, _ = await GachaLog.load_history_info(str(user_id), uid)
for item in data['list']:
pool_name = GACHA_TYPE_LIST[BannerType(int(item['gacha_type']))]
for item in data["list"]:
pool_name = GACHA_TYPE_LIST[BannerType(int(item["gacha_type"]))]
item_info = GachaItem.parse_obj(item)
if item_info not in gacha_log.item_list[pool_name]:
gacha_log.item_list[pool_name].append(item_info)
@ -307,12 +307,14 @@ class GachaLog:
gacha_type=str(data.banner_type.value),
item_type=data.type,
rank_type=str(data.rarity),
time=datetime.datetime(data.time.year,
time=datetime.datetime(
data.time.year,
data.time.month,
data.time.day,
data.time.hour,
data.time.minute,
data.time.second)
data.time.second,
),
)
if item not in gacha_log.item_list[pool_name]:
@ -324,11 +326,11 @@ class GachaLog:
i.sort(key=lambda x: (x.time, x.id))
gacha_log.update_time = datetime.datetime.now()
await GachaLog.save_gacha_log_info(str(user_id), str(client.uid), gacha_log)
return '更新完成,本次没有新增数据' if new_num == 0 else f'更新完成,本次共新增{new_num}条抽卡记录'
return "更新完成,本次没有新增数据" if new_num == 0 else f"更新完成,本次共新增{new_num}条抽卡记录"
@staticmethod
def check_avatar_up(name: str, gacha_time: datetime.datetime) -> bool:
if name in {'莫娜', '七七', '迪卢克', ''}:
if name in {"莫娜", "七七", "迪卢克", ""}:
return False
elif name == "刻晴":
start_time = datetime.datetime.strptime("2021-02-17 18:00:00", "%Y-%m-%d %H:%M:%S")
@ -355,7 +357,7 @@ class GachaLog:
result = []
for item in data:
count += 1
if item.rank_type == '5':
if item.rank_type == "5":
if item.item_type == "角色" and pool_name in {"角色祈愿", "常驻祈愿"}:
result.append(
FiveStarItem(
@ -396,7 +398,7 @@ class GachaLog:
result = []
for item in data:
count += 1
if item.rank_type == '4':
if item.rank_type == "4":
if item.item_type == "角色":
result.append(
FourStarItem(
@ -422,10 +424,7 @@ class GachaLog:
return result, count
@staticmethod
def get_301_pool_data(total: int,
all_five: List[FiveStarItem],
no_five_star: int,
no_four_star: int):
def get_301_pool_data(total: int, all_five: List[FiveStarItem], no_five_star: int, no_four_star: int):
# 总共五星
five_star = len(all_five)
five_star_up = len([i for i in all_five if i.isUp])
@ -433,8 +432,11 @@ class GachaLog:
# 五星平均
five_star_avg = round(total / five_star, 2) if five_star != 0 else 0
# 小保底不歪
small_protect = round((five_star_up - five_star_big) / (five_star - five_star_big) * 100.0, 1) if \
five_star - five_star_big != 0 else "0.0"
small_protect = (
round((five_star_up - five_star_big) / (five_star - five_star_big) * 100.0, 1)
if five_star - five_star_big != 0
else "0.0"
)
# 五星常驻
five_star_const = five_star - five_star_up
# UP 平均
@ -454,12 +456,13 @@ class GachaLog:
{"num": five_star_const, "unit": "", "lable": "五星常驻"},
{"num": up_avg, "unit": "", "lable": "UP平均"},
{"num": up_cost, "unit": "", "lable": "UP花费原石"},
]
],
]
@staticmethod
def get_200_pool_data(total: int, all_five: List[FiveStarItem], all_four: List[FourStarItem],
no_five_star: int, no_four_star: int):
def get_200_pool_data(
total: int, all_five: List[FiveStarItem], all_four: List[FourStarItem], no_five_star: int, no_four_star: int
):
# 总共五星
five_star = len(all_five)
# 五星平均
@ -486,12 +489,13 @@ class GachaLog:
{"num": four_star, "unit": "", "lable": "四星"},
{"num": four_star_avg, "unit": "", "lable": "四星平均"},
{"num": four_star_max_count, "unit": four_star_max, "lable": "四星最多"},
]
],
]
@staticmethod
def get_302_pool_data(total: int, all_five: List[FiveStarItem], all_four: List[FourStarItem],
no_five_star: int, no_four_star: int):
def get_302_pool_data(
total: int, all_five: List[FiveStarItem], all_four: List[FourStarItem], no_five_star: int, no_four_star: int
):
# 总共五星
five_star = len(all_five)
# 五星平均
@ -518,7 +522,7 @@ class GachaLog:
{"num": four_star, "unit": "", "lable": "四星"},
{"num": four_star_avg, "unit": "", "lable": "四星平均"},
{"num": four_star_max_count, "unit": four_star_max, "lable": "四星最多"},
]
],
]
@staticmethod
@ -592,13 +596,15 @@ class GachaLog:
up_pool.parse(item)
up_pool.count_item(data)
for up_pool in up_pool_data:
pool_data.append({
pool_data.append(
{
"count": up_pool.count,
"list": up_pool.to_list(),
"name": up_pool.name,
"start": up_pool.start.strftime("%Y-%m-%d"),
"end": up_pool.end.strftime("%Y-%m-%d"),
})
}
)
pool_data = [i for i in pool_data if i["count"] > 0]
return {
"uid": client.uid,