mirror of
https://github.com/PaiGramTeam/PamGram.git
synced 2024-11-22 06:17:54 +00:00
439 lines
17 KiB
Python
439 lines
17 KiB
Python
import asyncio
|
|
from pathlib import Path
|
|
from ssl import SSLZeroReturnError
|
|
from typing import Optional, List, Dict
|
|
|
|
from aiofiles import open as async_open
|
|
from httpx import AsyncClient, HTTPError
|
|
|
|
from core.base_service import BaseService
|
|
from modules.wiki.base import WikiModel
|
|
from modules.wiki.models.avatar_config import AvatarIcon
|
|
from modules.wiki.models.head_icon import HeadIcon
|
|
from modules.wiki.models.light_cone_config import LightConeIcon
|
|
from modules.wiki.models.phone_theme import PhoneTheme
|
|
from utils.const import PROJECT_ROOT
|
|
from utils.log import logger
|
|
from utils.typedefs import StrOrURL, StrOrInt
|
|
|
|
ASSETS_PATH = PROJECT_ROOT.joinpath("resources/assets")
|
|
ASSETS_PATH.mkdir(exist_ok=True, parents=True)
|
|
DATA_MAP = {
|
|
"avatar": WikiModel.BASE_URL + "avatar_icons.json",
|
|
"light_cone": WikiModel.BASE_URL + "light_cone_icons.json",
|
|
"avatar_eidolon": WikiModel.BASE_URL + "avatar_eidolon_icons.json",
|
|
"avatar_skill": WikiModel.BASE_URL + "skill/info.json",
|
|
"head_icon": WikiModel.BASE_URL + "head_icons.json",
|
|
"phone_theme": WikiModel.BASE_URL + "phone_themes.json",
|
|
}
|
|
|
|
|
|
class AssetsServiceError(Exception):
|
|
pass
|
|
|
|
|
|
class AssetsCouldNotFound(AssetsServiceError):
|
|
def __init__(self, message: str, target: str):
|
|
self.message = message
|
|
self.target = target
|
|
super().__init__(f"{message}: target={target}")
|
|
|
|
|
|
class _AssetsService:
|
|
client: Optional[AsyncClient] = None
|
|
|
|
def __init__(self, client: Optional[AsyncClient] = None) -> None:
|
|
self.client = client
|
|
|
|
async def _download(self, url: StrOrURL, path: Path, retry: int = 5) -> Optional[Path]:
|
|
"""从 url 下载图标至 path"""
|
|
if not url:
|
|
return None
|
|
logger.debug("正在从 %s 下载图标至 %s", url, path)
|
|
headers = None
|
|
for time in range(retry):
|
|
try:
|
|
response = await self.client.get(url, follow_redirects=False, headers=headers)
|
|
except Exception as error: # pylint: disable=W0703
|
|
if not isinstance(error, (HTTPError, SSLZeroReturnError)):
|
|
logger.error(error) # 打印未知错误
|
|
if time != retry - 1: # 未达到重试次数
|
|
await asyncio.sleep(1)
|
|
else:
|
|
raise error
|
|
continue
|
|
if response.status_code != 200: # 判定页面是否正常
|
|
return None
|
|
async with async_open(path, "wb") as file:
|
|
await file.write(response.content) # 保存图标
|
|
return path.resolve()
|
|
|
|
|
|
class _AvatarAssets(_AssetsService):
|
|
path: Path
|
|
data: List[AvatarIcon]
|
|
name_map: Dict[str, AvatarIcon]
|
|
id_map: Dict[int, AvatarIcon]
|
|
|
|
def __init__(self, client: Optional[AsyncClient] = None) -> None:
|
|
super().__init__(client)
|
|
self.path = ASSETS_PATH.joinpath("avatar")
|
|
self.path.mkdir(exist_ok=True, parents=True)
|
|
|
|
async def initialize(self):
|
|
logger.info("正在初始化角色素材图标")
|
|
html = await self.client.get(DATA_MAP["avatar"])
|
|
eidolons = await self.client.get(DATA_MAP["avatar_eidolon"])
|
|
eidolons_data = eidolons.json()
|
|
skills = await self.client.get(DATA_MAP["avatar_skill"])
|
|
skills_data = skills.json()
|
|
self.data = [AvatarIcon(**data) for data in html.json()]
|
|
self.name_map = {icon.name: icon for icon in self.data}
|
|
self.id_map = {icon.id: icon for icon in self.data}
|
|
tasks = []
|
|
for icon in self.data:
|
|
eidolons_s_data = eidolons_data.get(str(icon.id), [])
|
|
skills_s_data = [f"{i}.png" for i in skills_data if i.startswith(str(icon.id) + "_")]
|
|
base_path = self.path / f"{icon.id}"
|
|
base_path.mkdir(exist_ok=True, parents=True)
|
|
gacha_path = base_path / "gacha.webp"
|
|
icon_path = base_path / "icon.webp"
|
|
normal_path = base_path / "normal.webp"
|
|
square_path = base_path / "square.png"
|
|
eidolons_paths = [(base_path / f"eidolon_{eidolon_id}.webp") for eidolon_id in range(1, 7)]
|
|
skills_paths = []
|
|
for i in skills_s_data:
|
|
temp_end = "_".join(i.split("_")[1:])
|
|
skills_paths.append(base_path / f"skill_{temp_end}")
|
|
if not gacha_path.exists():
|
|
tasks.append(self._download(icon.gacha, gacha_path))
|
|
if not icon_path.exists():
|
|
tasks.append(self._download(icon.icon_, icon_path))
|
|
if not normal_path.exists():
|
|
tasks.append(self._download(icon.normal, normal_path))
|
|
if not square_path.exists() and icon.square:
|
|
tasks.append(self._download(icon.square, square_path))
|
|
for index, eidolon in enumerate(eidolons_paths):
|
|
if not eidolons_s_data:
|
|
break
|
|
if not eidolon.exists():
|
|
tasks.append(self._download(eidolons_s_data[index], eidolon))
|
|
for index, skill in enumerate(skills_paths):
|
|
if not skill.exists():
|
|
tasks.append(self._download(WikiModel.BASE_URL + "skill/" + skills_s_data[index], skill))
|
|
if len(tasks) >= 100:
|
|
await asyncio.gather(*tasks)
|
|
tasks = []
|
|
if tasks:
|
|
await asyncio.gather(*tasks)
|
|
logger.info("角色素材图标初始化完成")
|
|
|
|
def get_path(self, icon: AvatarIcon, name: str, ext: str = "webp") -> Path:
|
|
path = self.path / f"{icon.id}"
|
|
path.mkdir(exist_ok=True, parents=True)
|
|
return path / f"{name}.{ext}"
|
|
|
|
def get_by_id(self, id_: int) -> Optional[AvatarIcon]:
|
|
return self.id_map.get(id_, None)
|
|
|
|
def get_by_name(self, name: str) -> Optional[AvatarIcon]:
|
|
return self.name_map.get(name, None)
|
|
|
|
def get_target(self, target: StrOrInt, second_target: StrOrInt = None) -> AvatarIcon:
|
|
data = None
|
|
if isinstance(target, int):
|
|
data = self.get_by_id(target)
|
|
elif isinstance(target, str):
|
|
data = self.get_by_name(target)
|
|
if data is None:
|
|
if second_target:
|
|
return self.get_target(second_target)
|
|
raise AssetsCouldNotFound("角色素材图标不存在", target)
|
|
return data
|
|
|
|
def gacha(self, target: StrOrInt, second_target: StrOrInt = None) -> Path:
|
|
icon = self.get_target(target, second_target)
|
|
return self.get_path(icon, "gacha")
|
|
|
|
def icon(self, target: StrOrInt, second_target: StrOrInt = None) -> Path:
|
|
icon = self.get_target(target, second_target)
|
|
return self.get_path(icon, "icon")
|
|
|
|
def normal(self, target: StrOrInt, second_target: StrOrInt = None) -> Path:
|
|
icon = self.get_target(target, second_target)
|
|
return self.get_path(icon, "normal")
|
|
|
|
def square(self, target: StrOrInt, second_target: StrOrInt = None, allow_icon: bool = True) -> Path:
|
|
icon = self.get_target(target, second_target)
|
|
path = self.get_path(icon, "square", "png")
|
|
if not path.exists():
|
|
if allow_icon:
|
|
return self.get_path(icon, "icon")
|
|
raise AssetsCouldNotFound("角色素材图标不存在", target)
|
|
return path
|
|
|
|
def eidolons(self, target: StrOrInt, second_target: StrOrInt = None) -> List[Path]:
|
|
"""星魂"""
|
|
icon = self.get_target(target, second_target)
|
|
return [self.get_path(icon, f"eidolon_{i}") for i in range(1, 7)]
|
|
|
|
def skill_basic_atk(self, target: StrOrInt, second_target: StrOrInt = None) -> Path:
|
|
"""普攻 001"""
|
|
icon = self.get_target(target, second_target)
|
|
return self.get_path(icon, "skill_basic_atk", "png")
|
|
|
|
def skill_skill(self, target: StrOrInt, second_target: StrOrInt = None) -> Path:
|
|
"""战技 002"""
|
|
icon = self.get_target(target, second_target)
|
|
return self.get_path(icon, "skill_skill", "png")
|
|
|
|
def skill_ultimate(self, target: StrOrInt, second_target: StrOrInt = None) -> Path:
|
|
"""终结技 003"""
|
|
icon = self.get_target(target, second_target)
|
|
return self.get_path(icon, "skill_ultimate", "png")
|
|
|
|
def skill_talent(self, target: StrOrInt, second_target: StrOrInt = None) -> Path:
|
|
"""天赋 004"""
|
|
icon = self.get_target(target, second_target)
|
|
return self.get_path(icon, "skill_talent", "png")
|
|
|
|
def skill_technique(self, target: StrOrInt, second_target: StrOrInt = None) -> Path:
|
|
"""秘技 007"""
|
|
icon = self.get_target(target, second_target)
|
|
return self.get_path(icon, "skill_technique", "png")
|
|
|
|
def skills(self, target: StrOrInt, second_target: StrOrInt = None) -> List[Path]:
|
|
icon = self.get_target(target, second_target)
|
|
return [
|
|
self.get_path(icon, "skill_basic_atk", "png"),
|
|
self.get_path(icon, "skill_skill", "png"),
|
|
self.get_path(icon, "skill_ultimate", "png"),
|
|
self.get_path(icon, "skill_talent", "png"),
|
|
self.get_path(icon, "skill_technique", "png"),
|
|
]
|
|
|
|
|
|
class _LightConeAssets(_AssetsService):
|
|
path: Path
|
|
data: List[LightConeIcon]
|
|
name_map: Dict[str, LightConeIcon]
|
|
id_map: Dict[int, LightConeIcon]
|
|
|
|
def __init__(self, client: Optional[AsyncClient] = None) -> None:
|
|
super().__init__(client)
|
|
self.path = ASSETS_PATH.joinpath("light_cone")
|
|
self.path.mkdir(exist_ok=True, parents=True)
|
|
|
|
async def initialize(self):
|
|
logger.info("正在初始化光锥素材图标")
|
|
html = await self.client.get(DATA_MAP["light_cone"])
|
|
self.data = [LightConeIcon(**data) for data in html.json()]
|
|
self.name_map = {icon.name: icon for icon in self.data}
|
|
self.id_map = {icon.id: icon for icon in self.data}
|
|
tasks = []
|
|
for icon in self.data:
|
|
base_path = self.path / f"{icon.id}"
|
|
base_path.mkdir(exist_ok=True, parents=True)
|
|
gacha_path = base_path / "gacha.webp"
|
|
icon_path = base_path / "icon.webp"
|
|
if not gacha_path.exists():
|
|
tasks.append(self._download(icon.gacha, gacha_path))
|
|
if not icon_path.exists():
|
|
tasks.append(self._download(icon.icon_, icon_path))
|
|
if len(tasks) >= 100:
|
|
await asyncio.gather(*tasks)
|
|
tasks = []
|
|
if tasks:
|
|
await asyncio.gather(*tasks)
|
|
logger.info("光锥素材图标初始化完成")
|
|
|
|
def get_path(self, icon: LightConeIcon, name: str) -> Path:
|
|
path = self.path / f"{icon.id}"
|
|
path.mkdir(exist_ok=True, parents=True)
|
|
return path / f"{name}.webp"
|
|
|
|
def get_by_id(self, id_: int) -> Optional[LightConeIcon]:
|
|
return self.id_map.get(id_, None)
|
|
|
|
def get_by_name(self, name: str) -> Optional[LightConeIcon]:
|
|
return self.name_map.get(name, None)
|
|
|
|
def get_target(self, target: StrOrInt, second_target: StrOrInt = None) -> Optional[LightConeIcon]:
|
|
if isinstance(target, int):
|
|
return self.get_by_id(target)
|
|
elif isinstance(target, str):
|
|
return self.get_by_name(target)
|
|
if second_target:
|
|
return self.get_target(second_target)
|
|
raise AssetsCouldNotFound("光锥素材图标不存在", target)
|
|
|
|
def gacha(self, target: StrOrInt, second_target: StrOrInt = None) -> Path:
|
|
icon = self.get_target(target, second_target)
|
|
return self.get_path(icon, "gacha")
|
|
|
|
def icon(self, target: StrOrInt, second_target: StrOrInt = None) -> Path:
|
|
icon = self.get_target(target, second_target)
|
|
return self.get_path(icon, "icon")
|
|
|
|
|
|
class _HeadIconAssets(_AssetsService):
|
|
path: Path
|
|
data: List[HeadIcon]
|
|
id_map: Dict[int, HeadIcon]
|
|
avatar_id_map: Dict[int, HeadIcon]
|
|
|
|
def __init__(self, client: Optional[AsyncClient] = None) -> None:
|
|
super().__init__(client)
|
|
self.path = ASSETS_PATH.joinpath("head_icon")
|
|
self.path.mkdir(exist_ok=True, parents=True)
|
|
|
|
async def initialize(self):
|
|
logger.info("正在初始化头像素材图标")
|
|
html = await self.client.get(DATA_MAP["head_icon"])
|
|
self.data = [HeadIcon(**data) for data in html.json()]
|
|
self.id_map = {icon.id: icon for icon in self.data}
|
|
self.avatar_id_map = {icon.avatar_id: icon for icon in self.data if icon.avatar_id}
|
|
tasks = []
|
|
for icon in self.data:
|
|
webp_path = self.path / f"{icon.id}.webp"
|
|
png_path = self.path / f"{icon.id}.png"
|
|
if not webp_path.exists() and icon.webp:
|
|
tasks.append(self._download(icon.webp, webp_path))
|
|
if not png_path.exists() and icon.png:
|
|
tasks.append(self._download(icon.png, png_path))
|
|
if len(tasks) >= 100:
|
|
await asyncio.gather(*tasks)
|
|
tasks = []
|
|
if tasks:
|
|
await asyncio.gather(*tasks)
|
|
logger.info("头像素材图标初始化完成")
|
|
|
|
def get_path(self, icon: HeadIcon, ext: str) -> Path:
|
|
path = self.path / f"{icon.id}.{ext}"
|
|
return path
|
|
|
|
def get_by_id(self, id_: int) -> Optional[HeadIcon]:
|
|
return self.id_map.get(id_, None)
|
|
|
|
def get_by_avatar_id(self, avatar_id: int) -> Optional[HeadIcon]:
|
|
return self.avatar_id_map.get(avatar_id, None)
|
|
|
|
def get_target(self, target: StrOrInt, second_target: StrOrInt = None) -> Optional[HeadIcon]:
|
|
if target and 1000 < target <= 9000:
|
|
data = self.get_by_avatar_id(target)
|
|
if data:
|
|
return data
|
|
data = self.get_by_id(target)
|
|
if data:
|
|
return data
|
|
if second_target:
|
|
return self.get_target(second_target)
|
|
raise AssetsCouldNotFound("头像素材图标不存在", target)
|
|
|
|
def webp(self, target: StrOrInt, second_target: StrOrInt = None) -> Path:
|
|
icon = self.get_target(target, second_target)
|
|
return self.get_path(icon, "webp")
|
|
|
|
def png(self, target: StrOrInt, second_target: StrOrInt = None) -> Path:
|
|
icon = self.get_target(target, second_target)
|
|
return self.get_path(icon, "png")
|
|
|
|
def icon(self, target: StrOrInt, second_target: StrOrInt = None) -> Path:
|
|
icon = self.get_target(target, second_target)
|
|
webp_path = self.get_path(icon, "webp")
|
|
png_path = self.get_path(icon, "png")
|
|
if webp_path.exists():
|
|
return webp_path
|
|
if png_path.exists():
|
|
return png_path
|
|
raise AssetsCouldNotFound("头像素材图标不存在", target)
|
|
|
|
|
|
class _PhoneThemeAssets(_AssetsService):
|
|
path: Path
|
|
data: List[PhoneTheme]
|
|
id_map: Dict[int, PhoneTheme]
|
|
|
|
def __init__(self, client: Optional[AsyncClient] = None) -> None:
|
|
super().__init__(client)
|
|
self.path = ASSETS_PATH.joinpath("phone_theme")
|
|
self.path.mkdir(exist_ok=True, parents=True)
|
|
|
|
async def initialize(self):
|
|
logger.info("正在初始化手机壁纸素材图标")
|
|
html = await self.client.get(DATA_MAP["phone_theme"])
|
|
self.data = [PhoneTheme(**data) for data in html.json()]
|
|
self.id_map = {theme.id: theme for theme in self.data}
|
|
tasks = []
|
|
for theme in self.data:
|
|
path = self.path / f"{theme.id}.png"
|
|
if not path.exists():
|
|
if theme.urls[0]:
|
|
tasks.append(self._download(theme.urls[0], path))
|
|
elif theme.urls[1]:
|
|
tasks.append(self._download(theme.urls[1], path))
|
|
if len(tasks) >= 100:
|
|
await asyncio.gather(*tasks)
|
|
tasks = []
|
|
if tasks:
|
|
await asyncio.gather(*tasks)
|
|
logger.info("手机壁纸素材图标初始化完成")
|
|
|
|
def get_path(self, theme: PhoneTheme, ext: str) -> Path:
|
|
path = self.path / f"{theme.id}.{ext}"
|
|
return path
|
|
|
|
def get_by_id(self, id_: int) -> Optional[PhoneTheme]:
|
|
return self.id_map.get(id_, None)
|
|
|
|
def get_target(self, target: StrOrInt, second_target: StrOrInt = None) -> Optional[PhoneTheme]:
|
|
data = self.get_by_id(target)
|
|
if data:
|
|
return data
|
|
if second_target:
|
|
return self.get_target(second_target)
|
|
raise AssetsCouldNotFound("手机壁纸素材图标不存在", target)
|
|
|
|
def icon(self, target: StrOrInt, second_target: StrOrInt = None) -> Path:
|
|
theme = self.get_target(target, second_target)
|
|
png_path = self.get_path(theme, "png")
|
|
if png_path.exists():
|
|
return png_path
|
|
raise AssetsCouldNotFound("手机壁纸素材图标不存在", target)
|
|
|
|
|
|
class AssetsService(BaseService.Dependence):
|
|
"""asset服务
|
|
|
|
用于储存和管理 asset :
|
|
当对应的 asset (如某角色图标)不存在时,该服务会先查找本地。
|
|
若本地不存在,则从网络上下载;若存在,则返回其路径
|
|
"""
|
|
|
|
client: Optional[AsyncClient] = None
|
|
|
|
avatar: _AvatarAssets
|
|
"""角色"""
|
|
|
|
head_icon: _HeadIconAssets
|
|
"""头像"""
|
|
|
|
phone_theme: _PhoneThemeAssets
|
|
"""手机壁纸"""
|
|
|
|
light_cone: _LightConeAssets
|
|
"""光锥"""
|
|
|
|
def __init__(self):
|
|
self.client = AsyncClient(timeout=60.0)
|
|
self.avatar = _AvatarAssets(self.client)
|
|
self.head_icon = _HeadIconAssets(self.client)
|
|
self.phone_theme = _PhoneThemeAssets(self.client)
|
|
self.light_cone = _LightConeAssets(self.client)
|
|
|
|
async def initialize(self): # pylint: disable=W0221
|
|
await self.avatar.initialize()
|
|
await self.head_icon.initialize()
|
|
await self.phone_theme.initialize()
|
|
await self.light_cone.initialize()
|