diff --git a/modules/apihelper/gacha_log.py b/modules/apihelper/gacha_log.py index cd68a7f..f70af84 100644 --- a/modules/apihelper/gacha_log.py +++ b/modules/apihelper/gacha_log.py @@ -18,11 +18,11 @@ from utils.const import PROJECT_ROOT GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "gacha_log") GACHA_LOG_PATH.mkdir(parents=True, exist_ok=True) GACHA_TYPE_LIST = { - BannerType.NOVICE: '新手祈愿', - BannerType.PERMANENT: '常驻祈愿', - BannerType.WEAPON: '武器祈愿', - BannerType.CHARACTER1: '角色祈愿', - BannerType.CHARACTER2: '角色祈愿' + BannerType.NOVICE: "新手祈愿", + BannerType.PERMANENT: "常驻祈愿", + BannerType.WEAPON: "武器祈愿", + BannerType.CHARACTER1: "角色祈愿", + BannerType.CHARACTER2: "角色祈愿", } @@ -52,28 +52,28 @@ class GachaItem(BaseModel): rank_type: str time: datetime.datetime - @validator('name') + @validator("name") def name_validator(cls, v): - if (roleToId(v) or weaponToId(v)) and not not_real_roles(v): + if (roleToId(v) or weaponToId(v)) and v not in not_real_roles: return v - raise ValueError('Invalid name') + raise ValueError("Invalid name") - @validator('gacha_type') + @validator("gacha_type") def check_gacha_type(cls, v): if v not in {"200", "301", "302", "400"}: raise ValueError("gacha_type must be 200, 301, 302 or 400") return v - @validator('item_type') + @validator("item_type") def check_item_type(cls, item): - if item not in {'角色', '武器'}: - raise ValueError('error item type') + if item not in {"角色", "武器"}: + raise ValueError("error item type") return item - @validator('rank_type') + @validator("rank_type") def check_rank_type(cls, rank): - if rank not in {'5', '4', '3'}: - raise ValueError('error rank type') + if rank not in {"5", "4", "3"}: + raise ValueError("error rank type") return rank @@ -82,10 +82,10 @@ class GachaLogInfo(BaseModel): uid: str update_time: datetime.datetime item_list: Dict[str, List[GachaItem]] = { - '角色祈愿': [], - '武器祈愿': [], - '常驻祈愿': [], - '新手祈愿': [], + "角色祈愿": [], + "武器祈愿": [], + "常驻祈愿": [], + "新手祈愿": [], } @@ -132,33 +132,31 @@ class Pool: class GachaLog: @staticmethod async def load_json(path): - async with aiofiles.open(path, 'r', encoding='utf-8') as f: + async with aiofiles.open(path, "r", encoding="utf-8") as f: return json.loads(await f.read()) @staticmethod async def save_json(path, data): - async with aiofiles.open(path, 'w', encoding='utf-8') as f: + async with aiofiles.open(path, "w", encoding="utf-8") as f: if isinstance(data, dict): return await f.write(json.dumps(data, ensure_ascii=False, indent=4)) await f.write(data) @staticmethod - async def load_history_info(user_id: str, uid: str, only_status: bool = False) -> Tuple[Optional[GachaLogInfo], bool]: + async def load_history_info( + user_id: str, uid: str, only_status: bool = False + ) -> Tuple[Optional[GachaLogInfo], bool]: """读取历史抽卡记录数据 :param user_id: 用户id :param uid: 原神uid :param only_status: 是否只读取状态 :return: 抽卡记录数据 """ - file_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json' + file_path = GACHA_LOG_PATH / f"{user_id}-{uid}.json" if only_status: return None, file_path.exists() if not file_path.exists(): - return GachaLogInfo( - user_id=user_id, - uid=uid, - update_time=datetime.datetime.now() - ), False + return GachaLogInfo(user_id=user_id, uid=uid, update_time=datetime.datetime.now()), False try: return GachaLogInfo.parse_obj(await GachaLog.load_json(file_path)), True except json.decoder.JSONDecodeError: @@ -171,9 +169,9 @@ class GachaLog: :param uid: 原神uid :return: 是否删除成功 """ - file_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json' - file_bak_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json.bak' - file_export_path = GACHA_LOG_PATH / f'{user_id}-{uid}-uigf.json' + file_path = GACHA_LOG_PATH / f"{user_id}-{uid}.json" + file_bak_path = GACHA_LOG_PATH / f"{user_id}-{uid}.json.bak" + file_export_path = GACHA_LOG_PATH / f"{user_id}-{uid}-uigf.json" with contextlib.suppress(Exception): file_bak_path.unlink(missing_ok=True) with contextlib.suppress(Exception): @@ -193,14 +191,14 @@ class GachaLog: :param uid: 原神uid :param info: 抽卡记录数据 """ - save_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json' - save_path_bak = GACHA_LOG_PATH / f'{user_id}-{uid}.json.bak' + save_path = GACHA_LOG_PATH / f"{user_id}-{uid}.json" + save_path_bak = GACHA_LOG_PATH / f"{user_id}-{uid}.json.bak" # 将旧数据备份一次 with contextlib.suppress(PermissionError): if save_path.exists(): if save_path_bak.exists(): save_path_bak.unlink() - save_path.rename(save_path.parent / f'{save_path.name}.bak') + save_path.rename(save_path.parent / f"{save_path.name}.bak") # 写入数据 await GachaLog.save_json(save_path, info.json()) @@ -213,35 +211,37 @@ class GachaLog: """ data, state = await GachaLog.load_history_info(user_id, uid) if not state: - return False, '派蒙还没有找到你导入的任何抽卡记录哦,快试试导入吧~', None - save_path = GACHA_LOG_PATH / f'{user_id}-{uid}-uigf.json' + return False, "派蒙还没有找到你导入的任何抽卡记录哦,快试试导入吧~", None + save_path = GACHA_LOG_PATH / f"{user_id}-{uid}-uigf.json" uigf_dict = { - 'info': { - 'uid': uid, - 'lang': 'zh-cn', - 'export_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - 'export_timestamp': int(time.time()), - 'export_app': 'TGPaimonBot', - 'export_app_version': "v3", - 'uigf_version': 'v2.2' + "info": { + "uid": uid, + "lang": "zh-cn", + "export_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "export_timestamp": int(time.time()), + "export_app": "TGPaimonBot", + "export_app_version": "v3", + "uigf_version": "v2.2", }, - 'list': [] + "list": [], } for items in data.item_list.values(): for item in items: - uigf_dict['list'].append({ - 'gacha_type': item.gacha_type, - 'item_id': '', - 'count': '1', - 'time': item.time.strftime('%Y-%m-%d %H:%M:%S'), - 'name': item.name, - 'item_type': item.item_type, - 'rank_type': item.rank_type, - 'id': item.id, - 'uigf_gacha_type': item.gacha_type - }) + uigf_dict["list"].append( + { + "gacha_type": item.gacha_type, + "item_id": "", + "count": "1", + "time": item.time.strftime("%Y-%m-%d %H:%M:%S"), + "name": item.name, + "item_type": item.item_type, + "rank_type": item.rank_type, + "id": item.id, + "uigf_gacha_type": item.gacha_type, + } + ) await GachaLog.save_json(save_path, uigf_dict) - return True, '', save_path + return True, "", save_path @staticmethod async def verify_data(data: List[GachaItem]): @@ -263,14 +263,14 @@ class GachaLog: new_num = 0 try: # 检查导入数据是否合法 - status, text = await GachaLog.verify_data([GachaItem(**i) for i in data['list']]) + status, text = await GachaLog.verify_data([GachaItem(**i) for i in data["list"]]) if not status: return text - uid = data['info']['uid'] + uid = data["info"]["uid"] int(uid) gacha_log, _ = await GachaLog.load_history_info(str(user_id), uid) - for item in data['list']: - pool_name = GACHA_TYPE_LIST[BannerType(int(item['gacha_type']))] + for item in data["list"]: + pool_name = GACHA_TYPE_LIST[BannerType(int(item["gacha_type"]))] item_info = GachaItem.parse_obj(item) if item_info not in gacha_log.item_list[pool_name]: gacha_log.item_list[pool_name].append(item_info) @@ -307,12 +307,14 @@ class GachaLog: gacha_type=str(data.banner_type.value), item_type=data.type, rank_type=str(data.rarity), - time=datetime.datetime(data.time.year, - data.time.month, - data.time.day, - data.time.hour, - data.time.minute, - data.time.second) + time=datetime.datetime( + data.time.year, + data.time.month, + data.time.day, + data.time.hour, + data.time.minute, + data.time.second, + ), ) if item not in gacha_log.item_list[pool_name]: @@ -324,11 +326,11 @@ class GachaLog: i.sort(key=lambda x: (x.time, x.id)) gacha_log.update_time = datetime.datetime.now() await GachaLog.save_gacha_log_info(str(user_id), str(client.uid), gacha_log) - return '更新完成,本次没有新增数据' if new_num == 0 else f'更新完成,本次共新增{new_num}条抽卡记录' + return "更新完成,本次没有新增数据" if new_num == 0 else f"更新完成,本次共新增{new_num}条抽卡记录" @staticmethod def check_avatar_up(name: str, gacha_time: datetime.datetime) -> bool: - if name in {'莫娜', '七七', '迪卢克', '琴'}: + if name in {"莫娜", "七七", "迪卢克", "琴"}: return False elif name == "刻晴": start_time = datetime.datetime.strptime("2021-02-17 18:00:00", "%Y-%m-%d %H:%M:%S") @@ -355,7 +357,7 @@ class GachaLog: result = [] for item in data: count += 1 - if item.rank_type == '5': + if item.rank_type == "5": if item.item_type == "角色" and pool_name in {"角色祈愿", "常驻祈愿"}: result.append( FiveStarItem( @@ -396,7 +398,7 @@ class GachaLog: result = [] for item in data: count += 1 - if item.rank_type == '4': + if item.rank_type == "4": if item.item_type == "角色": result.append( FourStarItem( @@ -422,10 +424,7 @@ class GachaLog: return result, count @staticmethod - def get_301_pool_data(total: int, - all_five: List[FiveStarItem], - no_five_star: int, - no_four_star: int): + def get_301_pool_data(total: int, all_five: List[FiveStarItem], no_five_star: int, no_four_star: int): # 总共五星 five_star = len(all_five) five_star_up = len([i for i in all_five if i.isUp]) @@ -433,8 +432,11 @@ class GachaLog: # 五星平均 five_star_avg = round(total / five_star, 2) if five_star != 0 else 0 # 小保底不歪 - small_protect = round((five_star_up - five_star_big) / (five_star - five_star_big) * 100.0, 1) if \ - five_star - five_star_big != 0 else "0.0" + small_protect = ( + round((five_star_up - five_star_big) / (five_star - five_star_big) * 100.0, 1) + if five_star - five_star_big != 0 + else "0.0" + ) # 五星常驻 five_star_const = five_star - five_star_up # UP 平均 @@ -454,12 +456,13 @@ class GachaLog: {"num": five_star_const, "unit": "个", "lable": "五星常驻"}, {"num": up_avg, "unit": "抽", "lable": "UP平均"}, {"num": up_cost, "unit": "", "lable": "UP花费原石"}, - ] + ], ] @staticmethod - def get_200_pool_data(total: int, all_five: List[FiveStarItem], all_four: List[FourStarItem], - no_five_star: int, no_four_star: int): + def get_200_pool_data( + total: int, all_five: List[FiveStarItem], all_four: List[FourStarItem], no_five_star: int, no_four_star: int + ): # 总共五星 five_star = len(all_five) # 五星平均 @@ -486,12 +489,13 @@ class GachaLog: {"num": four_star, "unit": "个", "lable": "四星"}, {"num": four_star_avg, "unit": "抽", "lable": "四星平均"}, {"num": four_star_max_count, "unit": four_star_max, "lable": "四星最多"}, - ] + ], ] @staticmethod - def get_302_pool_data(total: int, all_five: List[FiveStarItem], all_four: List[FourStarItem], - no_five_star: int, no_four_star: int): + def get_302_pool_data( + total: int, all_five: List[FiveStarItem], all_four: List[FourStarItem], no_five_star: int, no_four_star: int + ): # 总共五星 five_star = len(all_five) # 五星平均 @@ -518,7 +522,7 @@ class GachaLog: {"num": four_star, "unit": "个", "lable": "四星"}, {"num": four_star_avg, "unit": "抽", "lable": "四星平均"}, {"num": four_star_max_count, "unit": four_star_max, "lable": "四星最多"}, - ] + ], ] @staticmethod @@ -592,13 +596,15 @@ class GachaLog: up_pool.parse(item) up_pool.count_item(data) for up_pool in up_pool_data: - pool_data.append({ - "count": up_pool.count, - "list": up_pool.to_list(), - "name": up_pool.name, - "start": up_pool.start.strftime("%Y-%m-%d"), - "end": up_pool.end.strftime("%Y-%m-%d"), - }) + pool_data.append( + { + "count": up_pool.count, + "list": up_pool.to_list(), + "name": up_pool.name, + "start": up_pool.start.strftime("%Y-%m-%d"), + "end": up_pool.end.strftime("%Y-%m-%d"), + } + ) pool_data = [i for i in pool_data if i["count"] > 0] return { "uid": client.uid,