mirror of
https://github.com/PaiGramTeam/PamGram.git
synced 2024-11-21 13:48:19 +00:00
97 lines
3.1 KiB
Python
97 lines
3.1 KiB
Python
import asyncio
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Union
|
|
|
|
import aiofiles
|
|
|
|
from utils.const import PROJECT_ROOT
|
|
|
|
try:
|
|
import ujson as jsonlib
|
|
except ImportError:
|
|
import json as jsonlib
|
|
|
|
|
|
PLAYER_CARDS_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "player_cards")
|
|
PLAYER_CARDS_PATH.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
class PlayerCardsFile:
|
|
_lock = asyncio.Lock()
|
|
|
|
def __init__(self, player_cards_path: Path = PLAYER_CARDS_PATH):
|
|
self.player_cards_path = player_cards_path
|
|
|
|
@staticmethod
|
|
async def load_json(path):
|
|
async with aiofiles.open(path, "r", encoding="utf-8") as f:
|
|
return jsonlib.loads(await f.read())
|
|
|
|
@staticmethod
|
|
async def save_json(path, data: Dict):
|
|
async with aiofiles.open(path, "w", encoding="utf-8") as f:
|
|
return await f.write(jsonlib.dumps(data, ensure_ascii=False, indent=4))
|
|
|
|
def get_file_path(self, uid: Union[str, int]):
|
|
"""获取文件路径
|
|
:param uid: UID
|
|
:return: 文件路径
|
|
"""
|
|
return self.player_cards_path / f"{uid}.json"
|
|
|
|
async def load_history_info(
|
|
self,
|
|
uid: Union[str, int],
|
|
) -> Optional[Dict]:
|
|
"""读取历史记录数据
|
|
:param uid: uid
|
|
:return: 角色历史记录数据
|
|
"""
|
|
file_path = self.get_file_path(uid)
|
|
if not file_path.exists():
|
|
return None
|
|
try:
|
|
return await self.load_json(file_path)
|
|
except jsonlib.JSONDecodeError:
|
|
return None
|
|
|
|
async def merge_info(
|
|
self,
|
|
uid: Union[str, int],
|
|
data: Dict,
|
|
props: Dict,
|
|
) -> Dict:
|
|
assistAvatarList = "assistAvatarList"
|
|
avatarId = "avatarId"
|
|
avatarDetailList = "avatarDetailList"
|
|
avatarList = "avatarList"
|
|
async with self._lock:
|
|
old_data = await self.load_history_info(uid)
|
|
if old_data is None:
|
|
old_data = {}
|
|
avatars = []
|
|
avatar_ids = []
|
|
for avatar in data.get(assistAvatarList, []):
|
|
avatars.append(avatar)
|
|
avatar_ids.append(avatar.get(avatarId, 0))
|
|
for avatar in data.get(avatarDetailList, []):
|
|
if avatar.get(avatarId, 0) in avatar_ids:
|
|
continue
|
|
avatars.append(avatar)
|
|
avatar_ids.append(avatar.get(avatarId, 0))
|
|
data[avatarList] = avatars
|
|
if assistAvatarList in data:
|
|
del data[assistAvatarList]
|
|
if avatarDetailList in data:
|
|
del data[avatarDetailList]
|
|
for i in old_data.get(avatarList, []):
|
|
if i.get(avatarId, 0) not in avatar_ids:
|
|
data[avatarList].append(i)
|
|
for i in data[avatarList]:
|
|
if property_ := props.get(i.get(avatarId, 0)):
|
|
i["property"] = property_
|
|
if i.get("property") is None:
|
|
i["property"] = []
|
|
await self.save_json(self.get_file_path(uid), data)
|
|
return data
|