️ 为抽卡分析添加缓存

This commit is contained in:
xtaodada 2022-10-10 11:37:58 +08:00
parent ccd1eaab8d
commit b1c6e7456f
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
5 changed files with 165 additions and 178 deletions

View File

@ -29,16 +29,15 @@ from utils.typedefs import StrOrInt, StrOrURL
if TYPE_CHECKING: if TYPE_CHECKING:
from multiprocessing.synchronize import RLock from multiprocessing.synchronize import RLock
ICON_TYPE = Union[ ICON_TYPE = Union[Callable[[bool], Awaitable[Optional[Path]]], Callable[..., Awaitable[Optional[Path]]]]
Callable[[bool], Awaitable[Optional[Path]]],
Callable[..., Awaitable[Optional[Path]]]
]
NAME_MAP_TYPE = Dict[str, StrOrURL] NAME_MAP_TYPE = Dict[str, StrOrURL]
ASSETS_PATH = PROJECT_ROOT.joinpath('resources/assets') ASSETS_PATH = PROJECT_ROOT.joinpath("resources/assets")
ASSETS_PATH.mkdir(exist_ok=True, parents=True) ASSETS_PATH.mkdir(exist_ok=True, parents=True)
DATA_MAP = {'avatar': AVATAR_DATA, 'weapon': WEAPON_DATA, 'material': MATERIAL_DATA} DATA_MAP = {"avatar": AVATAR_DATA, "weapon": WEAPON_DATA, "material": MATERIAL_DATA}
DEFAULT_EnkaAssets = EnkaAssets(lang="chs")
class AssetsServiceError(Exception): class AssetsServiceError(Exception):
@ -50,7 +49,7 @@ class AssetsCouldNotFound(AssetsServiceError):
class _AssetsService(ABC): class _AssetsService(ABC):
_lock: ClassVar['RLock'] = Lock() _lock: ClassVar["RLock"] = Lock()
_dir: ClassVar[Path] _dir: ClassVar[Path]
icon_types: ClassVar[list[str]] icon_types: ClassVar[list[str]]
@ -70,7 +69,7 @@ class _AssetsService(ABC):
@cached_property @cached_property
def honey_id(self) -> str: def honey_id(self) -> str:
"""当前资源在 Honey Impact 所对应的 ID""" """当前资源在 Honey Impact 所对应的 ID"""
return HONEY_DATA[self.type].get(str(self.id), [''])[0] return HONEY_DATA[self.type].get(str(self.id), [""])[0]
@property @property
def path(self) -> Path: def path(self) -> Path:
@ -98,26 +97,20 @@ class _AssetsService(ABC):
def __init_subclass__(cls, **kwargs) -> None: def __init_subclass__(cls, **kwargs) -> None:
"""初始化一些类变量""" """初始化一些类变量"""
from itertools import chain from itertools import chain
cls.icon_types = [ # 支持的图标类型 cls.icon_types = [ # 支持的图标类型
k k
for k, v in for k, v in chain(cls.__annotations__.items(), *map(lambda x: x.__annotations__.items(), cls.__bases__))
chain( if v in [ICON_TYPE, "ICON_TYPE"]
cls.__annotations__.items(),
*map(
lambda x: x.__annotations__.items(),
cls.__bases__
)
)
if v in [ICON_TYPE, 'ICON_TYPE']
] ]
cls.type = cls.__name__.lstrip('_').split('Assets')[0].lower() # 当前 assert 的类型 cls.type = cls.__name__.lstrip("_").split("Assets")[0].lower() # 当前 assert 的类型
cls._dir = ASSETS_PATH.joinpath(cls.type) # 图标保存的文件夹 cls._dir = ASSETS_PATH.joinpath(cls.type) # 图标保存的文件夹
cls._dir.mkdir(exist_ok=True, parents=True) cls._dir.mkdir(exist_ok=True, parents=True)
async def _download(self, url: StrOrURL, path: Path, retry: int = 5) -> Path | None: async def _download(self, url: StrOrURL, path: Path, retry: int = 5) -> Path | None:
"""从 url 下载图标至 path""" """从 url 下载图标至 path"""
logger.debug(f"正在从 {url} 下载图标至 {path}") logger.debug(f"正在从 {url} 下载图标至 {path}")
headers = {'user-agent': 'TGPaimonBot/3.0'} if URL(url).host == 'enka.network' else None headers = {"user-agent": "TGPaimonBot/3.0"} if URL(url).host == "enka.network" else None
for time in range(retry): for time in range(retry):
try: try:
response = await self.client.get(url, follow_redirects=False, headers=headers) response = await self.client.get(url, follow_redirects=False, headers=headers)
@ -131,7 +124,7 @@ class _AssetsService(ABC):
continue continue
if response.status_code != 200: # 判定页面是否正常 if response.status_code != 200: # 判定页面是否正常
return None return None
async with async_open(path, 'wb') as file: async with async_open(path, "wb") as file:
await file.write(response.content) # 保存图标 await file.write(response.content) # 保存图标
return path.resolve() return path.resolve()
@ -159,7 +152,7 @@ class _AssetsService(ABC):
if overwrite and path is not None and path.exists(): if overwrite and path is not None and path.exists():
await async_remove(path) await async_remove(path)
# 依次从使用当前 assets class 中的爬虫下载图标,顺序为爬虫名的字母顺序 # 依次从使用当前 assets class 中的爬虫下载图标,顺序为爬虫名的字母顺序
for func in map(lambda x: getattr(self, x), sorted(filter(lambda x: x.startswith('_get_from_'), dir(self)))): for func in map(lambda x: getattr(self, x), sorted(filter(lambda x: x.startswith("_get_from_"), dir(self)))):
if (path := await func(item)) is not None: if (path := await func(item)) is not None:
return path return path
@ -200,26 +193,26 @@ class _AvatarAssets(_AssetsService):
def game_name(self) -> str: def game_name(self) -> str:
icon = "UI_AvatarIcon_" icon = "UI_AvatarIcon_"
if (avatar := AVATAR_DATA.get(str(self.id), None)) is not None: if (avatar := AVATAR_DATA.get(str(self.id), None)) is not None:
icon = avatar['icon'] icon = avatar["icon"]
else: else:
for aid, avatar in AVATAR_DATA.items(): for aid, avatar in AVATAR_DATA.items():
if aid.startswith(str(self.id)): if aid.startswith(str(self.id)):
icon = avatar['icon'] icon = avatar["icon"]
return re.findall(r"UI_AvatarIcon_(.*)", icon)[0] return re.findall(r"UI_AvatarIcon_(.*)", icon)[0]
@cached_property @cached_property
def honey_id(self) -> str: def honey_id(self) -> str:
return HONEY_DATA['avatar'].get(str(self.id), '')[0] return HONEY_DATA["avatar"].get(str(self.id), "")[0]
@cached_property @cached_property
def enka(self) -> Optional[EnkaCharacterAsset]: def enka(self) -> Optional[EnkaCharacterAsset]:
api = getattr(self, '_enka_api', None) api = getattr(self, "_enka_api", None)
cid = getattr(self, 'id', None) cid = getattr(self, "id", None)
return None if api is None or cid is None else api.character(cid) return None if api is None or cid is None else api.character(cid)
def __init__(self, client: Optional[AsyncClient] = None, enka: Optional[EnkaAssets] = None): def __init__(self, client: Optional[AsyncClient] = None, enka: Optional[EnkaAssets] = None):
super().__init__(client) super().__init__(client)
self._enka_api = enka or EnkaAssets(lang='chs') self._enka_api = enka or DEFAULT_EnkaAssets
def __call__(self, target: StrOrInt) -> "_AvatarAssets": def __call__(self, target: StrOrInt) -> "_AvatarAssets":
temp = target temp = target
@ -236,39 +229,33 @@ class _AvatarAssets(_AssetsService):
return result return result
async def _get_from_ambr(self, item: str) -> Path | None: async def _get_from_ambr(self, item: str) -> Path | None:
if item in {'icon', 'side', 'gacha'}: if item in {"icon", "side", "gacha"}:
url = AMBR_HOST.join(f"assets/UI/{self.game_name_map[item]}.png") url = AMBR_HOST.join(f"assets/UI/{self.game_name_map[item]}.png")
return await self._download(url, self.path.joinpath(f"{item}.png")) return await self._download(url, self.path.joinpath(f"{item}.png"))
async def _get_from_enka(self, item: str) -> Path | None: async def _get_from_enka(self, item: str) -> Path | None:
path = self.path.joinpath(f"{item}.png") path = self.path.joinpath(f"{item}.png")
item = 'banner' if item == 'gacha' else item item = "banner" if item == "gacha" else item
# noinspection PyUnboundLocalVariable # noinspection PyUnboundLocalVariable
if ( if self.enka is not None and item in (data := self.enka.images.dict()).keys() and (url := data[item]["url"]):
self.enka is not None
and
item in (data := self.enka.images.dict()).keys()
and
(url := data[item]['url'])
):
return await self._download(url, path) return await self._download(url, path)
@cached_property @cached_property
def honey_name_map(self) -> dict[str, str]: def honey_name_map(self) -> dict[str, str]:
return { return {
'icon': f"{self.honey_id}_icon", "icon": f"{self.honey_id}_icon",
'side': f"{self.honey_id}_side_icon", "side": f"{self.honey_id}_side_icon",
'gacha': f"{self.honey_id}_gacha_splash", "gacha": f"{self.honey_id}_gacha_splash",
'gacha_card': f"{self.honey_id}_gacha_card", "gacha_card": f"{self.honey_id}_gacha_card",
} }
@cached_property @cached_property
def game_name_map(self) -> dict[str, str]: def game_name_map(self) -> dict[str, str]:
return { return {
'icon': f"UI_AvatarIcon_{self.game_name}", "icon": f"UI_AvatarIcon_{self.game_name}",
'card': f"UI_AvatarIcon_{self.game_name}_Card", "card": f"UI_AvatarIcon_{self.game_name}_Card",
'side': f"UI_AvatarIcon_Side_{self.game_name}", "side": f"UI_AvatarIcon_Side_{self.game_name}",
'gacha': f"UI_Gacha_AvatarImg_{self.game_name}", "gacha": f"UI_Gacha_AvatarImg_{self.game_name}",
} }
@ -281,14 +268,14 @@ class _WeaponAssets(_AssetsService):
@cached_property @cached_property
def game_name(self) -> str: def game_name(self) -> str:
return re.findall(r"UI_EquipIcon_(.*)", WEAPON_DATA[str(self.id)]['icon'])[0] return re.findall(r"UI_EquipIcon_(.*)", WEAPON_DATA[str(self.id)]["icon"])[0]
@cached_property @cached_property
def game_name_map(self) -> dict[str, str]: def game_name_map(self) -> dict[str, str]:
return { return {
'icon': f"UI_EquipIcon_{self.game_name}", "icon": f"UI_EquipIcon_{self.game_name}",
'awaken': f"UI_EquipIcon_{self.game_name}_Awaken", "awaken": f"UI_EquipIcon_{self.game_name}_Awaken",
'gacha': f"UI_Gacha_EquipIcon_{self.game_name}" "gacha": f"UI_Gacha_EquipIcon_{self.game_name}",
} }
@cached_property @cached_property
@ -307,16 +294,16 @@ class _WeaponAssets(_AssetsService):
async def _get_from_enka(self, item: str) -> Path | None: async def _get_from_enka(self, item: str) -> Path | None:
if item in self.game_name_map: if item in self.game_name_map:
url = ENKA_HOST.join(f'ui/{self.game_name_map.get(item)}.png') url = ENKA_HOST.join(f"ui/{self.game_name_map.get(item)}.png")
path = self.path.joinpath(f"{item}.png") path = self.path.joinpath(f"{item}.png")
return await self._download(url, path) return await self._download(url, path)
@cached_property @cached_property
def honey_name_map(self) -> dict[str, str]: def honey_name_map(self) -> dict[str, str]:
return { return {
'icon': f'{self.honey_id}', "icon": f"{self.honey_id}",
'awaken': f'{self.honey_id}_awaken_icon', "awaken": f"{self.honey_id}_awaken_icon",
'gacha': f'{self.honey_id}_gacha_icon', "gacha": f"{self.honey_id}_gacha_icon",
} }
@ -327,11 +314,11 @@ class _MaterialAssets(_AssetsService):
@cached_property @cached_property
def game_name_map(self) -> dict[str, str]: def game_name_map(self) -> dict[str, str]:
return {'icon': f"UI_ItemIcon_{self.game_name}"} return {"icon": f"UI_ItemIcon_{self.game_name}"}
@cached_property @cached_property
def honey_name_map(self) -> dict[str, str]: def honey_name_map(self) -> dict[str, str]:
return {'icon': self.honey_id} return {"icon": self.honey_id}
def __call__(self, target: StrOrInt) -> Self: def __call__(self, target: StrOrInt) -> Self:
temp = target temp = target
@ -340,24 +327,24 @@ class _MaterialAssets(_AssetsService):
if target.isnumeric(): if target.isnumeric():
target = int(target) target = int(target)
else: else:
target = {v['name']: int(k) for k, v in MATERIAL_DATA.items()}.get(target) target = {v["name"]: int(k) for k, v in MATERIAL_DATA.items()}.get(target)
if isinstance(target, str) or target is None: if isinstance(target, str) or target is None:
raise AssetsCouldNotFound(f"找不到对应的素材: target={temp}") raise AssetsCouldNotFound(f"找不到对应的素材: target={temp}")
result.id = target result.id = target
return result return result
async def _get_from_ambr(self, item: str) -> Path | None: async def _get_from_ambr(self, item: str) -> Path | None:
if item == 'icon': if item == "icon":
url = AMBR_HOST.join(f"assets/UI/{self.game_name_map.get(item)}.png") url = AMBR_HOST.join(f"assets/UI/{self.game_name_map.get(item)}.png")
path = self.path.joinpath(f"{item}.png") path = self.path.joinpath(f"{item}.png")
return await self._download(url, path) return await self._download(url, path)
async def _get_from_honey(self, item: str) -> Path | None: async def _get_from_honey(self, item: str) -> Path | None:
path = self.path.joinpath(f"{item}.png") path = self.path.joinpath(f"{item}.png")
url = HONEY_HOST.join(f'/img/{self.honey_name_map.get(item)}.png') url = HONEY_HOST.join(f"/img/{self.honey_name_map.get(item)}.png")
if (result := await self._download(url, path)) is None: if (result := await self._download(url, path)) is None:
path = self.path.joinpath(f"{item}.webp") path = self.path.joinpath(f"{item}.webp")
url = HONEY_HOST.join(f'/img/{self.honey_name_map.get(item)}.webp') url = HONEY_HOST.join(f"/img/{self.honey_name_map.get(item)}.webp")
return await self._download(url, path) return await self._download(url, path)
return result return result
@ -380,7 +367,7 @@ class _ArtifactAssets(_AssetsService):
@cached_property @cached_property
def honey_id(self) -> str: def honey_id(self) -> str:
return HONEY_DATA['artifact'][str(self.id)][0] return HONEY_DATA["artifact"][str(self.id)][0]
@cached_property @cached_property
def game_name(self) -> str: def game_name(self) -> str:
@ -388,7 +375,7 @@ class _ArtifactAssets(_AssetsService):
async def _get_from_enka(self, item: str) -> Path | None: async def _get_from_enka(self, item: str) -> Path | None:
if item in self.game_name_map: if item in self.game_name_map:
url = ENKA_HOST.join(f'ui/{self.game_name_map.get(item)}.png') url = ENKA_HOST.join(f"ui/{self.game_name_map.get(item)}.png")
path = self.path.joinpath(f"{item}.png") path = self.path.joinpath(f"{item}.png")
return await self._download(url, path) return await self._download(url, path)
@ -410,7 +397,7 @@ class _ArtifactAssets(_AssetsService):
@cached_property @cached_property
def honey_name_map(self) -> dict[str, str]: def honey_name_map(self) -> dict[str, str]:
first_id = int(re.findall(r'\d+', HONEY_DATA['artifact'][str(self.id)][-1])[0]) first_id = int(re.findall(r"\d+", HONEY_DATA["artifact"][str(self.id)][-1])[0])
return { return {
"icon": f"i_n{first_id + 30}", "icon": f"i_n{first_id + 30}",
"flower": f"i_n{first_id + 30}", "flower": f"i_n{first_id + 30}",
@ -432,43 +419,43 @@ class _NamecardAssets(_AssetsService):
@cached_property @cached_property
def honey_id(self) -> str: def honey_id(self) -> str:
return HONEY_DATA['namecard'][str(self.id)][0] return HONEY_DATA["namecard"][str(self.id)][0]
@cached_property @cached_property
def game_name(self) -> str: def game_name(self) -> str:
return NAMECARD_DATA[str(self.id)]['icon'] return NAMECARD_DATA[str(self.id)]["icon"]
def __call__(self, target: int) -> "_NamecardAssets": def __call__(self, target: int) -> "_NamecardAssets":
result = _NamecardAssets(self.client) result = _NamecardAssets(self.client)
result.id = target result.id = target
result.enka = EnkaAssets(lang='chs').namecards(target) result.enka = DEFAULT_EnkaAssets.namecards(target)
return result return result
async def _get_from_ambr(self, item: str) -> Path | None: async def _get_from_ambr(self, item: str) -> Path | None:
if item == 'profile': if item == "profile":
url = AMBR_HOST.join(f"assets/UI/namecard/{self.game_name_map[item]}.png.png") url = AMBR_HOST.join(f"assets/UI/namecard/{self.game_name_map[item]}.png.png")
return await self._download(url, self.path.joinpath(f"{item}.png")) return await self._download(url, self.path.joinpath(f"{item}.png"))
async def _get_from_enka(self, item: str) -> Path | None: async def _get_from_enka(self, item: str) -> Path | None:
path = self.path.joinpath(f"{item}.png") path = self.path.joinpath(f"{item}.png")
url = getattr(self.enka, {'profile': 'banner'}.get(item, item), None) url = getattr(self.enka, {"profile": "banner"}.get(item, item), None)
if url is not None: if url is not None:
return await self._download(url.url, path) return await self._download(url.url, path)
@cached_property @cached_property
def game_name_map(self) -> dict[str, str]: def game_name_map(self) -> dict[str, str]:
return { return {
'icon': self.game_name, "icon": self.game_name,
'navbar': NAMECARD_DATA[str(self.id)]['navbar'], "navbar": NAMECARD_DATA[str(self.id)]["navbar"],
'profile': NAMECARD_DATA[str(self.id)]['profile'] "profile": NAMECARD_DATA[str(self.id)]["profile"],
} }
@cached_property @cached_property
def honey_name_map(self) -> dict[str, str]: def honey_name_map(self) -> dict[str, str]:
return { return {
'icon': self.honey_id, "icon": self.honey_id,
'navbar': f"{self.honey_id}_back", "navbar": f"{self.honey_id}_back",
'profile': f"{self.honey_id}_profile", "profile": f"{self.honey_id}_profile",
} }
@ -497,8 +484,7 @@ class AssetsService(Service):
def __init__(self): def __init__(self):
for attr, assets_type_name in filter( for attr, assets_type_name in filter(
lambda x: (not x[0].startswith('_')) and x[1].endswith('Assets'), lambda x: (not x[0].startswith("_")) and x[1].endswith("Assets"), self.__annotations__.items()
self.__annotations__.items()
): ):
setattr(self, attr, globals()[assets_type_name]()) setattr(self, attr, globals()[assets_type_name]())
@ -510,4 +496,4 @@ class AssetsService(Service):
logger.info("刷新元数据成功") logger.info("刷新元数据成功")
AssetsServiceType = TypeVar('AssetsServiceType', bound=_AssetsService) AssetsServiceType = TypeVar("AssetsServiceType", bound=_AssetsService)

View File

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import functools
from typing import Any, Generic, ItemsView, Iterator, KeysView, TypeVar from typing import Any, Generic, ItemsView, Iterator, KeysView, TypeVar
import ujson as json import ujson as json
@ -11,16 +12,20 @@ from utils.log import logger
from utils.typedefs import StrOrInt from utils.typedefs import StrOrInt
__all__ = [ __all__ = [
'HONEY_DATA', "HONEY_DATA",
'AVATAR_DATA', 'WEAPON_DATA', 'MATERIAL_DATA', 'ARTIFACT_DATA', 'NAMECARD_DATA', "AVATAR_DATA",
'honey_id_to_game_id', "WEAPON_DATA",
'Data' "MATERIAL_DATA",
"ARTIFACT_DATA",
"NAMECARD_DATA",
"honey_id_to_game_id",
"Data",
] ]
K = TypeVar('K') K = TypeVar("K")
V = TypeVar('V') V = TypeVar("V")
data_dir = PROJECT_ROOT.joinpath('metadata/data/') data_dir = PROJECT_ROOT.joinpath("metadata/data/")
data_dir.mkdir(parents=True, exist_ok=True) data_dir.mkdir(parents=True, exist_ok=True)
_cache = {} _cache = {}
@ -35,15 +40,14 @@ class Data(dict, Generic[K, V]):
if (result := _cache.get(self._file_name)) not in [None, {}]: if (result := _cache.get(self._file_name)) not in [None, {}]:
self._dict = result self._dict = result
else: else:
path = data_dir.joinpath(self._file_name).with_suffix('.json') path = data_dir.joinpath(self._file_name).with_suffix(".json")
if not path.exists(): if not path.exists():
logger.error( logger.error(
f"暂未找到名为 \"{self._file_name}.json\" 的 metadata , " f'暂未找到名为 "{self._file_name}.json" 的 metadata , ' "请先使用 [yellow bold]/refresh_metadata[/] 命令下载",
"请先使用 [yellow bold]/refresh_metadata[/] 命令下载", extra={"markup": True},
extra={'markup': True}
) )
self._dict = {} self._dict = {}
with open(path, encoding='utf-8') as file: with open(path, encoding="utf-8") as file:
self._dict = json.load(file) self._dict = json.load(file)
_cache.update({self._file_name: self._dict}) _cache.update({self._file_name: self._dict})
return self._dict return self._dict
@ -75,14 +79,15 @@ class Data(dict, Generic[K, V]):
return self.data.items() return self.data.items()
HONEY_DATA: dict[str, dict[StrOrInt, list[str | int]]] = Data('honey') HONEY_DATA: dict[str, dict[StrOrInt, list[str | int]]] = Data("honey")
AVATAR_DATA: dict[str, dict[str, int | str | list[int]]] = Data('avatar') AVATAR_DATA: dict[str, dict[str, int | str | list[int]]] = Data("avatar")
WEAPON_DATA: dict[str, dict[str, int | str]] = Data('weapon') WEAPON_DATA: dict[str, dict[str, int | str]] = Data("weapon")
MATERIAL_DATA: dict[str, dict[str, int | str]] = Data('material') MATERIAL_DATA: dict[str, dict[str, int | str]] = Data("material")
ARTIFACT_DATA: dict[str, dict[str, int | str | list[int] | dict[str, str]]] = Data('reliquary') ARTIFACT_DATA: dict[str, dict[str, int | str | list[int] | dict[str, str]]] = Data("reliquary")
NAMECARD_DATA: dict[str, dict[str, int | str]] = Data('namecard') NAMECARD_DATA: dict[str, dict[str, int | str]] = Data("namecard")
@functools.lru_cache()
def honey_id_to_game_id(honey_id: str, item_type: str) -> str | None: def honey_id_to_game_id(honey_id: str, item_type: str) -> str | None:
return next((key for key, value in HONEY_DATA[item_type].items() if value[0] == honey_id), None) return next((key for key, value in HONEY_DATA[item_type].items() if value[0] == honey_id), None)

View File

@ -1,5 +1,7 @@
from __future__ import annotations from __future__ import annotations
import functools
from metadata.genshin import WEAPON_DATA from metadata.genshin import WEAPON_DATA
__all__ = [ __all__ = [
@ -183,24 +185,28 @@ weapons = {
# noinspection PyPep8Naming # noinspection PyPep8Naming
@functools.lru_cache()
def roleToName(shortname: str) -> str: def roleToName(shortname: str) -> str:
"""讲角色昵称转为正式名""" """讲角色昵称转为正式名"""
return next((value[0] for value in roles.values() for name in value if name == shortname), shortname) return next((value[0] for value in roles.values() for name in value if name == shortname), shortname)
# noinspection PyPep8Naming # noinspection PyPep8Naming
@functools.lru_cache()
def roleToId(name: str) -> int | None: def roleToId(name: str) -> int | None:
"""获取角色ID""" """获取角色ID"""
return next((key for key, value in roles.items() for n in value if n == name), None) return next((key for key, value in roles.items() for n in value if n == name), None)
# noinspection PyPep8Naming # noinspection PyPep8Naming
@functools.lru_cache()
def weaponToName(shortname: str) -> str: def weaponToName(shortname: str) -> str:
"""讲武器昵称转为正式名""" """讲武器昵称转为正式名"""
return next((key for key, value in weapons.items() if shortname == key or shortname in value), shortname) return next((key for key, value in weapons.items() if shortname == key or shortname in value), shortname)
# noinspection PyPep8Naming # noinspection PyPep8Naming
@functools.lru_cache()
def weaponToId(name: str) -> int | None: def weaponToId(name: str) -> int | None:
"""获取武器ID""" """获取武器ID"""
return next((int(key) for key, value in WEAPON_DATA.items() if weaponToName(name) in value['name']), None) return next((int(key) for key, value in WEAPON_DATA.items() if weaponToName(name) in value['name']), None)

View File

@ -374,29 +374,27 @@ class GachaLog:
count += 1 count += 1
if item.rank_type == "5": if item.rank_type == "5":
if item.item_type == "角色" and pool_name in {"角色祈愿", "常驻祈愿"}: if item.item_type == "角色" and pool_name in {"角色祈愿", "常驻祈愿"}:
result.append( data = {
FiveStarItem( "name": item.name,
name=item.name, "icon": (await assets.avatar(roleToId(item.name)).icon()).as_uri(),
icon=(await assets.avatar(roleToId(item.name)).icon()).as_uri(), "count": count,
count=count, "type": "角色",
type="角色", "isUp": GachaLog.check_avatar_up(item.name, item.time) if pool_name == "角色祈愿" else False,
isUp=GachaLog.check_avatar_up(item.name, item.time) if pool_name == "角色祈愿" else False, "isBig": (not result[-1].isUp) if result and pool_name == "角色祈愿" else False,
isBig=(not result[-1].isUp) if result and pool_name == "角色祈愿" else False, "time": item.time,
time=item.time, }
) result.append(FiveStarItem.construct(**data))
)
elif item.item_type == "武器" and pool_name in {"武器祈愿", "常驻祈愿"}: elif item.item_type == "武器" and pool_name in {"武器祈愿", "常驻祈愿"}:
result.append( data = {
FiveStarItem( "name": item.name,
name=item.name, "icon": (await assets.weapon(weaponToId(item.name)).icon()).as_uri(),
icon=(await assets.weapon(weaponToId(item.name)).icon()).as_uri(), "count": count,
count=count, "type": "武器",
type="武器", "isUp": False,
isUp=False, "isBig": False,
isBig=False, "time": item.time,
time=item.time, }
) result.append(FiveStarItem.construct(**data))
)
count = 0 count = 0
result.reverse() result.reverse()
return result, count return result, count
@ -415,25 +413,23 @@ class GachaLog:
count += 1 count += 1
if item.rank_type == "4": if item.rank_type == "4":
if item.item_type == "角色": if item.item_type == "角色":
result.append( data = {
FourStarItem( "name": item.name,
name=item.name, "icon": (await assets.avatar(roleToId(item.name)).icon()).as_uri(),
icon=(await assets.avatar(roleToId(item.name)).icon()).as_uri(), "count": count,
count=count, "type": "角色",
type="角色", "time": item.time,
time=item.time, }
) result.append(FourStarItem.construct(**data))
)
elif item.item_type == "武器": elif item.item_type == "武器":
result.append( data = {
FourStarItem( "name": item.name,
name=item.name, "icon": (await assets.weapon(weaponToId(item.name)).icon()).as_uri(),
icon=(await assets.weapon(weaponToId(item.name)).icon()).as_uri(), "count": count,
count=count, "type": "武器",
type="武器", "time": item.time,
time=item.time, }
) result.append(FourStarItem.construct(**data))
)
count = 0 count = 0
result.reverse() result.reverse()
return result, count return result, count

View File

@ -22,6 +22,7 @@ from telegram import InlineKeyboardButton, InlineKeyboardMarkup, InputMediaPhoto
from telegram.constants import ChatAction from telegram.constants import ChatAction
from telegram.ext import CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, filters from telegram.ext import CallbackContext, CallbackQueryHandler, CommandHandler, MessageHandler, filters
from core.base.assets import DEFAULT_EnkaAssets
from core.baseplugin import BasePlugin from core.baseplugin import BasePlugin
from core.config import config from core.config import config
from core.plugin import Plugin, handler from core.plugin import Plugin, handler
@ -38,8 +39,6 @@ from utils.log import logger
from utils.models.base import RegionEnum from utils.models.base import RegionEnum
from utils.patch.aiohttp import AioHttpTimeoutException from utils.patch.aiohttp import AioHttpTimeoutException
assets = Assets(lang="chs")
class PlayerCards(Plugin, BasePlugin): class PlayerCards(Plugin, BasePlugin):
def __init__(self, user_service: UserService = None, template_service: TemplateService = None): def __init__(self, user_service: UserService = None, template_service: TemplateService = None):
@ -82,12 +81,8 @@ class PlayerCards(Plugin, BasePlugin):
except UserNotFoundError: except UserNotFoundError:
reply_message = await message.reply_text("未查询到账号信息,请先私聊派蒙绑定账号") reply_message = await message.reply_text("未查询到账号信息,请先私聊派蒙绑定账号")
if filters.ChatType.GROUPS.filter(message): if filters.ChatType.GROUPS.filter(message):
self._add_delete_message_job( self._add_delete_message_job(context, reply_message.chat_id, reply_message.message_id, 30)
context, reply_message.chat_id, reply_message.message_id, 30 self._add_delete_message_job(context, message.chat_id, message.message_id, 30)
)
self._add_delete_message_job(
context, message.chat_id, message.message_id, 30
)
return return
data = await self._fetch_user(uid) data = await self._fetch_user(uid)
if isinstance(data, str): if isinstance(data, str):
@ -98,25 +93,26 @@ class PlayerCards(Plugin, BasePlugin):
return return
if len(args) == 1: if len(args) == 1:
character_name = roleToName(args[0]) character_name = roleToName(args[0])
logger.info( logger.info(f"用户 {user.full_name}[{user.id}] 角色卡片查询命令请求 || character_name[{character_name}] uid[{uid}]")
f"用户 {user.full_name}[{user.id}] 角色卡片查询命令请求 || character_name[{character_name}] uid[{uid}]"
)
else: else:
logger.info(f"用户 {user.full_name}[{user.id}] 角色卡片查询命令请求") logger.info(f"用户 {user.full_name}[{user.id}] 角色卡片查询命令请求")
buttons = [] buttons = []
temp = [] temp = []
for index, value in enumerate(data.characters): for index, value in enumerate(data.characters):
temp.append(InlineKeyboardButton( temp.append(
InlineKeyboardButton(
value.name, value.name,
callback_data=f"get_player_card|{user.id}|{uid}|{value.name}", callback_data=f"get_player_card|{user.id}|{uid}|{value.name}",
)) )
)
if index == 3: if index == 3:
buttons.append(temp) buttons.append(temp)
temp = [] temp = []
if len(temp) > 0: if len(temp) > 0:
buttons.append(temp) buttons.append(temp)
reply_message = await message.reply_photo(photo=self.temp_photo, caption="请选择你要查询的角色", reply_message = await message.reply_photo(
reply_markup=InlineKeyboardMarkup(buttons)) photo=self.temp_photo, caption="请选择你要查询的角色", reply_markup=InlineKeyboardMarkup(buttons)
)
if reply_message.photo: if reply_message.photo:
self.temp_photo = reply_message.photo[-1].file_id self.temp_photo = reply_message.photo[-1].file_id
return return
@ -148,8 +144,7 @@ class PlayerCards(Plugin, BasePlugin):
result, user_id, uid = await get_player_card_callback(callback_query.data) result, user_id, uid = await get_player_card_callback(callback_query.data)
if user.id != user_id: if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" await callback_query.answer(text="这不是你的按钮!\n" "再乱点再按我叫西风骑士团、千岩军、天领奉和教令院了!", show_alert=True)
"再乱点再按我叫西风骑士团、千岩军、天领奉和教令院了!", show_alert=True)
return return
logger.info(f"用户 {user.full_name}[{user.id}] 角色卡片查询命令请求 || character_name[{result}] uid[{uid}]") logger.info(f"用户 {user.full_name}[{user.id}] 角色卡片查询命令请求 || character_name[{result}] uid[{uid}]")
data = await self._fetch_user(uid) data = await self._fetch_user(uid)
@ -190,7 +185,8 @@ class Artifact(BaseModel):
self.score += substat_scores self.score += substat_scores
self.score = round(self.score, 1) self.score = round(self.score, 1)
for r in (("D", 10), for r in (
("D", 10),
("C", 16.5), ("C", 16.5),
("B", 23.1), ("B", 23.1),
("A", 29.7), ("A", 29.7),
@ -198,7 +194,8 @@ class Artifact(BaseModel):
("SS", 42.9), ("SS", 42.9),
("SSS", 49.5), ("SSS", 49.5),
("ACE", 56.1), ("ACE", 56.1),
("ACE²", 66)): ("ACE²", 66),
):
if self.score >= r[1]: if self.score >= r[1]:
self.score_label = r[0] self.score_label = r[0]
self.score_class = self.get_score_class(r[0]) self.score_class = self.get_score_class(r[0])
@ -236,7 +233,8 @@ class RenderTemplate:
artifact_total_score = round(artifact_total_score, 1) artifact_total_score = round(artifact_total_score, 1)
artifact_total_score_label: str = "E" artifact_total_score_label: str = "E"
for r in (("D", 10), for r in (
("D", 10),
("C", 16.5), ("C", 16.5),
("B", 23.1), ("B", 23.1),
("A", 29.7), ("A", 29.7),
@ -244,7 +242,8 @@ class RenderTemplate:
("SS", 42.9), ("SS", 42.9),
("SSS", 49.5), ("SSS", 49.5),
("ACE", 56.1), ("ACE", 56.1),
("ACE²", 66)): ("ACE²", 66),
):
if artifact_total_score / 5 >= r[1]: if artifact_total_score / 5 >= r[1]:
artifact_total_score_label = r[0] artifact_total_score_label = r[0]
@ -262,7 +261,6 @@ class RenderTemplate:
# 圣遗物评级颜色 # 圣遗物评级颜色
"artifact_total_score_class": Artifact.get_score_class(artifact_total_score_label), "artifact_total_score_class": Artifact.get_score_class(artifact_total_score_label),
"artifacts": artifacts, "artifacts": artifacts,
# 需要在模板中使用的 enum 类型 # 需要在模板中使用的 enum 类型
"DigitType": DigitType, "DigitType": DigitType,
} }
@ -278,7 +276,7 @@ class RenderTemplate:
data, data,
{"width": 950, "height": 1080}, {"width": 950, "height": 1080},
full_page=True, full_page=True,
query_selector=".text-neutral-200" query_selector=".text-neutral-200",
) )
async def de_stats(self) -> List[Tuple[str, Any]]: async def de_stats(self) -> List[Tuple[str, Any]]:
@ -317,14 +315,10 @@ class RenderTemplate:
pass pass
elif stat[1].id != 26: # 治疗加成 elif stat[1].id != 26: # 治疗加成
continue continue
value = ( value = stat[1].to_rounded() if isinstance(stat[1], Stats) else stat[1].to_percentage_symbol()
stat[1].to_rounded()
if isinstance(stat[1], Stats)
else stat[1].to_percentage_symbol()
)
if value in ("0%", 0): if value in ("0%", 0):
continue continue
name = assets.get_hash_map(stat[0]) name = DEFAULT_EnkaAssets.get_hash_map(stat[0])
if name is None: if name is None:
continue continue
items.append((name, value)) items.append((name, value))