diff --git a/metadata/shortname.py b/metadata/shortname.py index 23e6296e..888dc5f9 100644 --- a/metadata/shortname.py +++ b/metadata/shortname.py @@ -13,6 +13,8 @@ __all__ = [ "roleToName", "weaponToName", "weaponToId", + "elementToName", + "elementsToColor", "not_real_roles", "roleToTag", ] @@ -744,6 +746,33 @@ weapons = { "水仙十字之剑": ["水仙", "水仙十字剑"], "静水流涌之辉": ["静水", "净水流涌之辉", "水神专武", "芙芙专武"], } +elements = { + "pyro": ["火"], + "hydro": ["水"], + "anemo": ["风"], + "cryo": ["冰"], + "electro": ["雷"], + "geo": ["岩"], + "dendro": ["草"], + "physical": ["物理"], +} +elementsToColor = { + "anemo": "#65B89A", + "geo": "#F6A824", + "electro": "#9F79B5", + "dendro": "#97C12B", + "hydro": "#3FB6ED", + "pyro": "#E76429", + "cryo": "#8FCDDC", + "physical": "#15161B", +} + + +@functools.lru_cache() +def elementToName(elem: str) -> str | None: + """将元素昵称转为正式名""" + elem = str.casefold(elem) # 忽略大小写 + return elements[elem][0] if elem in elements else None # noinspection PyPep8Naming diff --git a/modules/apihelper/client/components/remote.py b/modules/apihelper/client/components/remote.py index c307f274..1b15701c 100644 --- a/modules/apihelper/client/components/remote.py +++ b/modules/apihelper/client/components/remote.py @@ -15,6 +15,7 @@ class Remote: MATERIAL = f"{BASE_URL}roles_material.json" RULE = f"{RESOURCE_FightPropRule_URL}FightPropRule_genshin.json" DAMAGE = f"{RESOURCE_FightPropRule_URL}GenshinDamageRule.json" + GCSIM = f"{RESOURCE_FightPropRule_URL}gcsim.json" @staticmethod async def get_remote_calendar() -> Dict[str, Dict]: @@ -80,3 +81,16 @@ class Remote: except Exception as exc: # skipcq: PYL-W0703 logger.error("获取云端伤害计算规则失败: %s", exc_info=exc) return {} + + @staticmethod + async def get_gcsim_scripts() -> Dict[str, str]: + """获取云端 gcsim 脚本""" + try: + async with AsyncClient() as client: + req = await client.get(Remote.GCSIM) + if req.status_code == 200: + return req.json() + return {} + except Exception as exc: # skipcq: PYL-W0703 + logger.error("获取云端 gcsim 脚本失败: %s", exc_info=exc) + return {} diff --git a/modules/gcsim/cache.py b/modules/gcsim/cache.py new file mode 100644 index 00000000..d4f00e19 --- /dev/null +++ b/modules/gcsim/cache.py @@ -0,0 +1,28 @@ +from typing import Optional + +from core.dependence.redisdb import RedisDB + +__all__ = [ + "GCSimCache", +] + + +class GCSimCache: + qname: str = "gcsim:" + + def __init__(self, redis: RedisDB, ttl: int = 24 * 60 * 60): + self.client = redis.client + self.ttl = ttl + + def get_key(self, player_id: str, script_hash: int) -> str: + return f"{self.qname}:{player_id}:{script_hash}" + + async def set_cache(self, player_id: str, script_hash: int, file_id: str) -> None: + key = self.get_key(player_id, script_hash) + await self.client.set(key, file_id, ex=self.ttl) + + async def get_cache(self, player_id: str, script_hash: int) -> Optional[str]: + key = self.get_key(player_id, script_hash) + data = await self.client.get(key) + if data: + return data.decode() diff --git a/modules/gcsim/file.py b/modules/gcsim/file.py new file mode 100644 index 00000000..2deda861 --- /dev/null +++ b/modules/gcsim/file.py @@ -0,0 +1,66 @@ +import json +import asyncio +import os +from pathlib import Path +from typing import Union + +import aiofiles + +from utils.const import DATA_DIR + + +PLAYER_SCRIPTS_PATH = DATA_DIR / "gcsim" +PLAYER_SCRIPTS_PATH.mkdir(parents=True, exist_ok=True) + + +class PlayerGCSimScripts: + _lock = asyncio.Lock() + + def __init__(self, player_scripts_path: Path = PLAYER_SCRIPTS_PATH): + self.player_scripts_path = player_scripts_path + + def get_player_path(self, uid: Union[str, int]): + player_path = self.player_scripts_path.joinpath(str(uid)) + player_path.mkdir(parents=True, exist_ok=True) + return player_path + + def get_script_path(self, uid: Union[str, int], script_key: str): + scripts_path = self.get_player_path(str(uid)).joinpath("scripts") + scripts_path.mkdir(parents=True, exist_ok=True) + return scripts_path.joinpath(f"{script_key}.txt") + + def get_result_path(self, uid: Union[str, int], script_key: str): + scripts_path = self.get_player_path(uid).joinpath("results") + scripts_path.mkdir(parents=True, exist_ok=True) + return scripts_path.joinpath(f"{script_key}.json") + + def get_fits_path(self, uid: Union[str, int]): + return self.get_player_path(uid).joinpath("fits.json") + + def get_fits(self, uid: Union[str, int]) -> list[dict]: + if self.get_fits_path(uid).exists(): + return json.loads(self.get_fits_path(uid).read_text(encoding="utf-8")) + return [] + + def remove_fits(self, uid: Union[str, int]): + self.get_fits_path(uid).unlink(missing_ok=True) + + def clear_fits(self): + if self.player_scripts_path.exists(): + for root, _, files in os.walk(self.player_scripts_path): + for file in files: + if file == "fits.json": + os.remove(os.path.join(root, file)) + + async def write_script( + self, + uid: Union[str, int], + script_key: str, + script: str, + ): + async with self._lock, aiofiles.open(self.get_script_path(uid, script_key), "w", encoding="utf-8") as f: + await f.write(script) + + async def write_fits(self, uid: Union[str, int], fits: list[dict]): + async with self._lock, aiofiles.open(self.get_fits_path(uid), "w", encoding="utf-8") as f: + await f.write(json.dumps(fits, ensure_ascii=False, indent=4)) diff --git a/plugins/admin/set_command.py b/plugins/admin/set_command.py index 7df3e6df..f63209c2 100644 --- a/plugins/admin/set_command.py +++ b/plugins/admin/set_command.py @@ -60,6 +60,7 @@ class SetCommandPlugin(Plugin): BotCommand("stats", "玩家统计查询"), BotCommand("player_card", "查询角色卡片"), BotCommand("avatar_board", "角色排名"), + BotCommand("gcsim", "组队伤害计算"), # Cookie 查询类 BotCommand("dailynote", "查询实时便笺"), BotCommand("ledger", "查询当月旅行札记"), @@ -80,6 +81,7 @@ class SetCommandPlugin(Plugin): BotCommand("sign_all", "全部账号重新签到"), BotCommand("send_log", "发送日志"), BotCommand("update", "更新"), + BotCommand("set_command", "重设命令"), ] await context.bot.set_my_commands(commands=group_command) # 留空,default 为 botCommandScopeDefault, 所有聊天可见 await context.bot.set_my_commands(commands=user_command + group_command, scope=BotCommandScopeAllPrivateChats()) diff --git a/plugins/genshin/gcsim/plugin.py b/plugins/genshin/gcsim/plugin.py new file mode 100644 index 00000000..6d0d85e1 --- /dev/null +++ b/plugins/genshin/gcsim/plugin.py @@ -0,0 +1,298 @@ +import copy +from typing import Optional, TYPE_CHECKING, List, Union, Dict + +from enkanetwork import EnkaNetworkResponse +from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, Message +from telegram.ext import filters +from telegram.helpers import create_deep_linked_url + +from core.config import config +from core.dependence.assets import AssetsService +from core.dependence.redisdb import RedisDB +from core.plugin import Plugin, handler +from core.services.players import PlayersService +from gram_core.services.template.services import TemplateService +from modules.gcsim.file import PlayerGCSimScripts +from modules.playercards.file import PlayerCardsFile +from plugins.genshin.gcsim.renderer import GCSimResultRenderer +from plugins.genshin.gcsim.runner import GCSimRunner, GCSimFit +from plugins.genshin.model.base import CharacterInfo +from plugins.genshin.model.converters.enka import EnkaConverter +from utils.log import logger + +if TYPE_CHECKING: + from telegram.ext import ContextTypes + +__all__ = ("GCSimPlugin",) + + +async def _no_account_return(message: Message, context: "ContextTypes.DEFAULT_TYPE"): + buttons = [ + [ + InlineKeyboardButton( + "点我绑定账号", + url=create_deep_linked_url(context.bot.username, "set_uid"), + ) + ] + ] + await message.reply_text("未查询到您所绑定的账号信息,请先绑定账号", reply_markup=InlineKeyboardMarkup(buttons)) + + +async def _no_character_return(user_id: int, uid: int, message: Message): + photo = open("resources/img/kitsune.png", "rb") + buttons = [ + [ + InlineKeyboardButton( + "更新面板", + callback_data=f"update_player_card|{user_id}|{uid}", + ) + ] + ] + await message.reply_photo( + photo=photo, + caption="角色列表未找到,请尝试点击下方按钮从 Enka.Network 更新角色列表", + reply_markup=InlineKeyboardMarkup(buttons), + ) + + +class GCSimPlugin(Plugin): + def __init__( + self, + assets_service: AssetsService, + player_service: PlayersService, + template_service: TemplateService, + redis: RedisDB = None, + ): + self.player_service = player_service + self.player_cards_file = PlayerCardsFile() + self.player_gcsim_scripts = PlayerGCSimScripts() + self.gcsim_runner = GCSimRunner(redis) + self.gcsim_renderer = GCSimResultRenderer(assets_service, template_service) + self.scripts_per_page = 8 + + async def initialize(self): + await self.gcsim_runner.initialize() + + def _gen_buttons( + self, user_id: int, uid: int, fits: List[GCSimFit], page: int = 1 + ) -> List[List[InlineKeyboardButton]]: + buttons = [] + for fit in fits[(page - 1) * self.scripts_per_page : page * self.scripts_per_page]: + button = InlineKeyboardButton( + f"{fit.script_key} ({','.join(map(str, fit.characters))})", + callback_data=f"enqueue_gcsim|{user_id}|{uid}|{fit.script_key}", + ) + if not buttons or len(buttons[-1]) >= 1: + buttons.append([]) + buttons[-1].append(button) + buttons.append( + [ + InlineKeyboardButton("上一页", callback_data=f"gcsim_page|{user_id}|{uid}|{page - 1}") + if page > 1 + else InlineKeyboardButton("更新配队", callback_data=f"gcsim_refresh|{user_id}|{uid}"), + InlineKeyboardButton( + f"{page}/{int(len(fits) / self.scripts_per_page) + 1}", + callback_data=f"gcsim_unclickable|{user_id}|{uid}|unclickable", + ), + InlineKeyboardButton("下一页", callback_data=f"gcsim_page|{user_id}|{uid}|{page + 1}") + if page < int(len(fits) / self.scripts_per_page) + 1 + else InlineKeyboardButton( + "更新配队", + callback_data=f"gcsim_refresh|{user_id}|{uid}", + ), + ] + ) + return buttons + + async def _get_uid(self, user_id: int, args: List[str], reply: Optional["Message"]) -> Optional[int]: + """通过消息获取 uid,优先级:args > reply > self""" + uid, user_id_ = None, user_id + if args: + for i in args: + if i is not None and i.isdigit() and len(i) == 9: + uid = int(i) + if reply: + try: + user_id_ = reply.from_user.id + except AttributeError: + pass + if not uid: + player_info = await self.player_service.get_player(user_id_) + if player_info is not None: + uid = player_info.player_id + if (not uid) and (user_id_ != user_id): + player_info = await self.player_service.get_player(user_id) + if player_info is not None: + uid = player_info.player_id + return uid + + @staticmethod + def _fix_skill_level(data: Dict) -> Dict: + for i in data["avatarInfoList"]: + if "proudSkillExtraLevelMap" in i: + del i["proudSkillExtraLevelMap"] + return data + + async def _load_characters(self, uid: Union[int, str]) -> List[CharacterInfo]: + original_data = await self.player_cards_file.load_history_info(uid) + if original_data is None: + return [] + if original_data.get("avatarInfoList") is None: + original_data["avatarInfoList"] = [] + if len(original_data["avatarInfoList"]) == 0: + return [] + enka_response: EnkaNetworkResponse = EnkaNetworkResponse.parse_obj( + self._fix_skill_level(copy.deepcopy(original_data)) + ) + character_infos = [] + for avatar_info in enka_response.characters: + try: + character_infos.append(EnkaConverter.to_character_info(avatar_info)) + except ValueError as e: + logger.error("无法解析 Enka.Network 角色信息: %s\n%s", e, avatar_info.json()) + return character_infos + + @handler.command(command="gcsim", block=False) + async def gcsim(self, update: Update, context: "ContextTypes.DEFAULT_TYPE"): + user = update.effective_user + message = update.effective_message + args = self.get_args(context) + if not self.gcsim_runner.initialized: + await message.reply_text("GCSim 未初始化,请稍候再试或重启派蒙") + return + if context.user_data.get("overlapping", False): + reply = await message.reply_text("旅行者已经有脚本正在运行,请让派蒙稍微休息一下") + if filters.ChatType.GROUPS.filter(message): + self.add_delete_message_job(reply) + self.add_delete_message_job(message) + return + logger.info("用户 %s[%s] 发出 gcsim 命令", user.full_name, user.id) + + uid = await self._get_uid(user.id, args, message.reply_to_message) + if uid is None: + return await _no_account_return(message, context) + + character_infos = await self._load_characters(uid) + if not character_infos: + return await _no_character_return(user.id, uid, message) + + fits = await self.gcsim_runner.get_fits(uid) + if not fits: + fits = await self.gcsim_runner.calculate_fits(uid, character_infos) + if not fits: + await message.reply_text("好像没有找到适合旅行者的配队呢,要不更新下面板吧") + return + buttons = self._gen_buttons(user.id, uid, fits) + await message.reply_text( + "请选择 GCSim 脚本", + reply_markup=InlineKeyboardMarkup(buttons), + ) + + @handler.callback_query(pattern=r"^gcsim_refresh\|", block=False) + async def gcsim_refresh(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: + callback_query = update.callback_query + user = callback_query.from_user + message = callback_query.message + + user_id, uid = map(int, callback_query.data.split("|")[1:]) + if user.id != user_id: + await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True) + return + + character_infos = await self._load_characters(uid) + if not character_infos: + return await _no_character_return(user.id, uid, message) + + await self.gcsim_runner.remove_fits(uid) + fits = await self.gcsim_runner.calculate_fits(uid, character_infos) + if not fits: + await callback_query.edit_message_text("好像没有找到适合旅行者的配队呢,要不更新下面板吧") + return + buttons = self._gen_buttons(user.id, uid, fits) + await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons)) + + @handler.callback_query(pattern=r"^gcsim_page\|", block=False) + async def gcsim_page(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: + callback_query = update.callback_query + user = callback_query.from_user + message = callback_query.message + + user_id, uid, page = map(int, callback_query.data.split("|")[1:]) + if user.id != user_id: + await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True) + return + + fits = await self.gcsim_runner.get_fits(uid) + if not fits: + await callback_query.answer(text="其他数据好像被派蒙吃掉了,要不重新试试吧", show_alert=True) + await message.delete() + return + buttons = self._gen_buttons(user_id, uid, fits, page) + await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons)) + + @handler.callback_query(pattern=r"^gcsim_unclickable\|", block=False) + async def gcsim_unclickable(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: + callback_query = update.callback_query + + _, _, _, reason = callback_query.data.split("|") + await callback_query.answer( + text="已经是第一页了!\n" + if reason == "first_page" + else "已经是最后一页了!\n" + if reason == "last_page" + else "这个按钮不可用\n" + config.notice.user_mismatch, + show_alert=True, + ) + + @handler.callback_query(pattern=r"^enqueue_gcsim\|", block=False) + async def enqueue_gcsim(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: + callback_query = update.callback_query + user = callback_query.from_user + message = callback_query.message + user_id, uid, script_key = callback_query.data.split("|")[1:] + msg_to_reply = message + if message.reply_to_message: + msg_to_reply = message.reply_to_message + logger.info("用户 %s[%s] GCSim运行请求 || %s", user.full_name, user.id, callback_query.data) + if str(user.id) != user_id: + await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True) + return + + logger.info("用户 %s[%s] enqueue_gcsim 运行请求 || %s", user.full_name, user.id, callback_query.data) + character_infos = await self._load_characters(uid) + if not character_infos: + return await _no_character_return(user.id, uid, message) + + await callback_query.edit_message_text("GCSim 运行中...", reply_markup=InlineKeyboardMarkup([])) + result = await self.gcsim_runner.run(user_id, uid, script_key, character_infos) + if result.error: + await callback_query.edit_message_text(result.error) + else: + await callback_query.edit_message_text(f"GCSim {result.script_key} 运行完成") + if result.file_id: + await msg_to_reply.reply_photo(result.file_id, caption=f"GCSim {script_key} 运行结果") + self.add_delete_message_job(message, delay=1) + return + + result_path = self.player_gcsim_scripts.get_result_path(uid, script_key) + if not result_path.exists(): + await callback_query.answer(text="运行结果似乎在提瓦特之外,派蒙找不到了", show_alert=True) + return + if result.script is None: + await callback_query.answer(text="脚本似乎在提瓦特之外,派蒙找不到了", show_alert=True) + return + + result_ = await self.gcsim_renderer.prepare_result(result_path, result.script, character_infos) + if not result_: + await callback_query.answer(text="在准备运行结果时派蒙出问题了", show_alert=True) + return + + render_result = await self.gcsim_renderer.render(script_key, result_) + reply = await render_result.reply_photo( + msg_to_reply, + filename=f"gcsim_{uid}_{script_key}.png", + caption=f"GCSim {script_key} 运行结果", + ) + self.add_delete_message_job(message, delay=1) + if reply and reply.photo: + await self.gcsim_runner.cache.set_cache(uid, hash(str(result.script)), reply.photo[0].file_id) diff --git a/plugins/genshin/gcsim/renderer.py b/plugins/genshin/gcsim/renderer.py new file mode 100644 index 00000000..6afbb5a8 --- /dev/null +++ b/plugins/genshin/gcsim/renderer.py @@ -0,0 +1,93 @@ +import json +from pathlib import Path +from typing import Optional, List + +from core.dependence.assets import AssetsService +from gram_core.services.template.models import RenderResult +from gram_core.services.template.services import TemplateService +from metadata.shortname import idToName, elementToName, elementsToColor +from plugins.genshin.model import GCSim, GCSimCharacterInfo, CharacterInfo +from plugins.genshin.model.converters.gcsim import GCSimConverter + + +class GCSimResultRenderer: + def __init__(self, assets_service: AssetsService, template_service: TemplateService): + self.assets_service = assets_service + self.template_service = template_service + + async def prepare_result( + self, result_path: Path, script: GCSim, character_infos: List[CharacterInfo] + ) -> Optional[dict]: + result = json.loads(result_path.read_text(encoding="utf-8")) + characters = {ch.character for ch in character_infos} + result["extra"] = {} + for idx, character_details in enumerate(result["character_details"]): + asset_id, _ = GCSimConverter.to_character(character_details["name"]) + gcsim_character: GCSimCharacterInfo = next( + filter(lambda gc, cn=character_details["name"]: gc.character == cn, script.characters), None + ) + if not gcsim_character: + return None + if character_details["name"] not in result["extra"]: + result["extra"][character_details["name"]] = {} + if GCSimConverter.to_character(gcsim_character.character)[1] in characters: + result["extra"][character_details["name"]]["owned"] = True + else: + result["extra"][character_details["name"]]["owned"] = False + + result["extra"][character_details["name"]]["icon"] = ( + await self.assets_service.avatar(asset_id).icon() + ).as_uri() + result["extra"][character_details["name"]]["rarity"] = self.assets_service.avatar(asset_id).enka.rarity + result["extra"][character_details["name"]]["constellation"] = gcsim_character.constellation + + if "character_dps" not in result["extra"]: + result["extra"]["character_dps"] = [] + result["extra"]["character_dps"].append( + {"value": result["statistics"]["character_dps"][idx]["mean"], "name": idToName(asset_id)} + ) + result["extra"]["element_dps"] = [ + {"value": data["mean"], "name": elementToName(elem), "itemStyle": {"color": elementsToColor[elem]}} + for elem, data in result["statistics"]["element_dps"].items() + ] + result["extra"]["damage"] = { + "xAxis": [i * 0.5 for i in range(len(result["statistics"]["damage_buckets"]["buckets"]))], + "series": [ + { + "data": [bucket["mean"] for bucket in result["statistics"]["damage_buckets"]["buckets"]], + "type": "line", + "stack": "x", + "areaStyle": {}, + "name": "平均伤害", + }, + { + "data": [bucket["min"] for bucket in result["statistics"]["damage_buckets"]["buckets"]], + "type": "line", + "stack": "x", + "name": "最小伤害", + }, + { + "data": [bucket["max"] for bucket in result["statistics"]["damage_buckets"]["buckets"]], + "type": "line", + "stack": "x", + "name": "最大伤害", + }, + { + "data": [bucket["sd"] for bucket in result["statistics"]["damage_buckets"]["buckets"]], + "type": "line", + "stack": "x", + "name": "标准差", + }, + ], + } + + return result + + async def render(self, script_key: str, data: dict) -> RenderResult: + return await self.template_service.render( + "genshin/gcsim/result.jinja2", + {"script_key": script_key, **data}, + full_page=True, + query_selector="body > div", + ttl=7 * 24 * 60 * 60, + ) diff --git a/plugins/genshin/gcsim/runner.py b/plugins/genshin/gcsim/runner.py new file mode 100644 index 00000000..90cf4045 --- /dev/null +++ b/plugins/genshin/gcsim/runner.py @@ -0,0 +1,229 @@ +import asyncio +import multiprocessing +import platform +import time +from dataclasses import dataclass +from pathlib import Path +from queue import Queue +from typing import Optional, Dict, List, Union, TYPE_CHECKING, Tuple + +import gcsim_pypi +from pydantic import BaseModel + +from metadata.shortname import idToName +from modules.apihelper.client.components.remote import Remote +from modules.gcsim.cache import GCSimCache +from modules.gcsim.file import PlayerGCSimScripts +from plugins.genshin.model.base import CharacterInfo, Character +from plugins.genshin.model.converters.gcsim import GCSimConverter +from plugins.genshin.model.gcsim import GCSim, GCSimCharacter +from utils.const import DATA_DIR +from utils.log import logger + +if TYPE_CHECKING: + from core.dependence.redisdb import RedisDB + +GCSIM_SCRIPTS_PATH = DATA_DIR / "gcsim" / "scripts" +GCSIM_SCRIPTS_PATH.mkdir(parents=True, exist_ok=True) + + +class FitCharacter(BaseModel): + id: int + name: str + gcsim: GCSimCharacter + character: Character + + def __str__(self): + return self.name + + +class GCSimFit(BaseModel): + script_key: str + fit_count: int + characters: List[FitCharacter] + total_levels: int + total_weapon_levels: int + + +@dataclass +class GCSimResult: + error: Optional[str] + user_id: str + uid: str + script_key: str + script: Optional[GCSim] = None + file_id: Optional[str] = None + + +def _get_gcsim_bin_name() -> str: + if platform.system() == "Windows": + return "gcsim.exe" + bin_name = "gcsim" + if platform.system() == "Darwin": + bin_name += ".darwin" + if platform.machine() == "arm64": + bin_name += ".arm64" + return bin_name + + +def _get_limit_command() -> str: + if platform.system() == "Linux": + return "ulimit -m 1000000 && ulimit -v 1000000 && timeout 120 " + return "" + + +class GCSimRunner: + def __init__(self, client: "RedisDB"): + self.initialized = False + self.bin_path = None + self.player_gcsim_scripts = PlayerGCSimScripts() + self.gcsim_version: Optional[str] = None + self.scripts: Dict[str, GCSim] = {} + max_concurrent_gcsim = multiprocessing.cpu_count() + self.sema = asyncio.BoundedSemaphore(max_concurrent_gcsim) + self.queue: Queue[None] = Queue() + self.cache = GCSimCache(client) + + @staticmethod + def check_gcsim_script(name: str, script: str) -> Optional[GCSim]: + try: + return GCSimConverter.from_gcsim_script(script) + except ValueError as e: + logger.error("无法解析 GCSim 脚本 %s: %s", name, e) + return None + + async def refresh(self): + self.player_gcsim_scripts.clear_fits() + self.scripts.clear() + new_scripts = await Remote.get_gcsim_scripts() + for name, text in new_scripts.items(): + if script := self.check_gcsim_script(name, text): + self.scripts[name] = script + for path in GCSIM_SCRIPTS_PATH.iterdir(): + if path.is_file(): + with open(path, "r", encoding="utf-8") as f: + try: + if script := self.check_gcsim_script(path.name, f.read()): + self.scripts[path.stem] = script + except UnicodeError as e: + logger.error("无法读取 GCSim 脚本 %s: %s", path.name, e) + + async def initialize(self): + gcsim_pypi_path = Path(gcsim_pypi.__file__).parent + + self.bin_path = gcsim_pypi_path.joinpath("bin").joinpath(_get_gcsim_bin_name()) + + process = await asyncio.create_subprocess_exec( + self.bin_path, "-version", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + + stdout, stderr = await process.communicate() + + if process.returncode == 0: + self.gcsim_version = stdout.decode().splitlines()[0] + logger.debug("GCSim version: %s", self.gcsim_version) + else: + logger.error("GCSim 运行时出错: %s", stderr.decode()) + + now = time.time() + await self.refresh() + logger.debug("加载 %d GCSim 脚本耗时 %.2f 秒", len(self.scripts), time.time() - now) + self.initialized = True + + async def _execute_gcsim( + self, user_id: str, uid: str, script_key: str, added_time: float, character_infos: List[CharacterInfo] + ) -> GCSimResult: + script = self.scripts.get(script_key) + if script is None: + return GCSimResult(error="未找到脚本", user_id=user_id, uid=uid, script_key=script_key) + try: + merged_script = GCSimConverter.merge_character_infos(script, character_infos) + except ValueError: + return GCSimResult(error="无法合并角色信息", user_id=user_id, uid=uid, script_key=script_key) + if file_id := await self.cache.get_cache(uid, hash(str(merged_script))): + return GCSimResult(error=None, user_id=user_id, uid=uid, script_key=script_key, file_id=file_id) + await self.player_gcsim_scripts.write_script(uid, script_key, str(merged_script)) + limit = _get_limit_command() + command = [ + self.bin_path, + "-c", + self.player_gcsim_scripts.get_script_path(uid, script_key).absolute().as_posix(), + "-out", + self.player_gcsim_scripts.get_result_path(uid, script_key).absolute().as_posix(), + ] + process = await asyncio.create_subprocess_shell( + limit + " ".join([str(i) for i in command]), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await process.communicate() + logger.debug("GCSim 脚本 (%s|%s|%s) 用时 %.2fs", user_id, uid, script_key, time.time() - added_time) + error = None + if stderr: + error = stderr.decode()[:500] + if "out of memory" in error: + error = "超出内存限制" + if process.returncode == 124: + error = "超出运行时间限制" + if error: + logger.error("GCSim 脚本 (%s|%s|%s) 错误: %s", user_id, uid, script_key, error) + return GCSimResult(error=error, user_id=user_id, uid=uid, script_key=script_key, script=merged_script) + if stdout: + logger.info("GCSim 脚本 (%s|%s|%s) 运行完成", user_id, uid, script_key) + logger.debug("GCSim 脚本 (%s|%s|%s) 输出: %s", user_id, uid, script_key, stdout.decode()) + return GCSimResult(error=None, user_id=user_id, uid=uid, script_key=script_key, script=merged_script) + return GCSimResult( + error="No output", + user_id=user_id, + uid=uid, + script_key=script_key, + script=merged_script, + ) + + async def run( + self, + user_id: str, + uid: str, + script_key: str, + character_infos: List[CharacterInfo], + ) -> GCSimResult: + start_time = time.time() + async with self.sema: + result = await self._execute_gcsim(user_id, uid, script_key, start_time, character_infos) + return result + + async def calculate_fits(self, uid: Union[int, str], character_infos: List[CharacterInfo]) -> List[GCSimFit]: + fits = [] + for key, script in self.scripts.items(): + # 空和莹会被认为是两个角色 + fit_characters: List[Tuple[CharacterInfo, GCSimCharacter]] = [] + for ch in character_infos: + gcsim_character = GCSimConverter.from_character(ch.character) + if gcsim_character in [c.character for c in script.characters]: + fit_characters.append((ch, gcsim_character)) + if fit_characters: + fits.append( + GCSimFit( + script_key=key, + characters=[ + FitCharacter(id=ch[0].id, name=idToName(ch[0].id), gcsim=ch[1], character=ch[0].character) + for ch in fit_characters + ], + fit_count=len(fit_characters), + total_levels=sum(ch.level for ch in script.characters), + total_weapon_levels=sum(ch.weapon_info.level for ch in script.characters), + ) + ) + fits = sorted( + fits, + key=lambda x: (x.fit_count, x.total_levels, x.total_weapon_levels), + reverse=True, + ) + await self.player_gcsim_scripts.write_fits(uid, [fit.dict() for fit in fits]) + return fits + + async def get_fits(self, uid: Union[int, str]) -> List[GCSimFit]: + return [GCSimFit(**fit) for fit in self.player_gcsim_scripts.get_fits(uid)] + + async def remove_fits(self, uid: Union[int, str]) -> None: + self.player_gcsim_scripts.remove_fits(uid) diff --git a/plugins/genshin/model/__init__.py b/plugins/genshin/model/__init__.py new file mode 100644 index 00000000..e178a1a4 --- /dev/null +++ b/plugins/genshin/model/__init__.py @@ -0,0 +1,29 @@ +from plugins.genshin.model.base import * + +from plugins.genshin.model.gcsim import * + +__all__ = [ + "Digit", + "DigitType", + "Character", + "CharacterInfo", + "CharacterStats", + "Weapon", + "WeaponInfo", + "WeaponType", + "Set", + "Artifact", + "ArtifactAttribute", + "ArtifactAttributeType", + "ArtifactPosition", + "GCSimCharacter", + "GCSimSet", + "GCSimSetInfo", + "GCSimWeapon", + "GCSimWeaponInfo", + "GCSimCharacterStats", + "GCSimCharacterInfo", + "GCSimTarget", + "GCSimEnergySettings", + "GCSim", +] diff --git a/plugins/genshin/model/base.py b/plugins/genshin/model/base.py new file mode 100644 index 00000000..e1730d9b --- /dev/null +++ b/plugins/genshin/model/base.py @@ -0,0 +1,198 @@ +from decimal import Decimal +from enum import Enum +from typing import Optional, List, NewType + +from pydantic import BaseModel, Field, validator + +# TODO: 考虑自动生成Enum +Character = NewType("Character", str) +Weapon = NewType("Weapon", str) +Set = NewType("Set", str) + + +class DigitType(Enum): + NUMERIC = "numeric" + PERCENT = "percent" + + +class Digit(BaseModel): + type: DigitType + value: Decimal + + +class WeaponType(Enum): + BOW = "bow" + CLAYMORE = "claymore" + CATALYST = "catalyst" + POLEARM = "polearm" + SWORD = "sword" + + +class ArtifactPosition(Enum): + FLOWER = "flower" + PLUME = "plume" + SANDS = "sands" + GOBLET = "goblet" + CIRCLET = "circlet" + + +class ArtifactAttributeType(Enum): + HP = "hp" + ATK = "atk" + DEF = "def" + HP_PERCENT = "hp_percent" + ATK_PERCENT = "atk_percent" + DEF_PERCENT = "def_percent" + ELEMENTAL_MASTERY = "elemental_mastery" + ENERGY_RECHARGE = "energy_recharge" + CRIT_RATE = "crit_rate" + CRIT_DMG = "crit_dmg" + HEALING_BONUS = "healing_bonus" + PYRO_DMG_BONUS = "pyro_dmg_bonus" + HYDRO_DMG_BONUS = "hydro_dmg_bonus" + DENDRO_DMG_BONUS = "dendro_dmg_bonus" + ELECTRO_DMG_BONUS = "electro_dmg_bonus" + ANEMO_DMG_BONUS = "anemo_dmg_bonus" + CRYO_DMG_BONUS = "cryo_dmg_bonus" + GEO_DMG_BONUS = "geo_dmg_bonus" + PHYSICAL_DMG_BONUS = "physical_dmg_bonus" + + +class ArtifactAttribute(BaseModel): + type: ArtifactAttributeType + digit: Digit + + +class WeaponInfo(BaseModel): + id: int = 0 + weapon: Weapon = "" + type: WeaponType + level: int = 0 + max_level: int = 0 + refinement: int = 0 + ascension: int = 0 + + @validator("max_level") + def validate_max_level(cls, v, values): + if v == 0: + return values["level"] + if v < values["level"]: + raise ValueError("max_level must be greater than or equal to level") + return v + + @validator("refinement") + def validate_refinement(cls, v): + if v < 0 or v > 5: + raise ValueError("refinement must be between 1 and 5") + return v + + +class Artifact(BaseModel): + id: int = 0 + set: Set = "" + position: ArtifactPosition + level: int = 0 + rarity: int = 0 + main_attribute: ArtifactAttribute + sub_attributes: List[ArtifactAttribute] = [] + + @validator("level") + def validate_level(cls, v): + if v < 0 or v > 20: + raise ValueError("level must be between 0 and 20") + return v + + @validator("rarity") + def validate_rarity(cls, v): + if v < 0 or v > 5: + raise ValueError("rarity must be between 0 and 5") + return v + + @validator("sub_attributes") + def validate_sub_attributes(cls, v): + if len(v) > 4: + raise ValueError("sub_attributes must not be greater than 4") + return v + + +class CharacterStats(BaseModel): + BASE_HP: Digit = Digit(type=DigitType.NUMERIC, value=Decimal(0)) + HP: Digit = Field(Digit(type=DigitType.NUMERIC, value=Decimal(0)), alias="FIGHT_PROP_HP") + HP_PERCENT: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_HP_PERCENT") + BASE_ATTACK: Digit = Field(Digit(type=DigitType.NUMERIC, value=Decimal(0)), alias="FIGHT_PROP_BASE_ATTACK") + ATTACK: Digit = Field(Digit(type=DigitType.NUMERIC, value=Decimal(0)), alias="FIGHT_PROP_ATTACK") + ATTACK_PERCENT: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_ATTACK_PERCENT") + BASE_DEFENSE: Digit = Field(Digit(type=DigitType.NUMERIC, value=Decimal(0)), alias="FIGHT_PROP_BASE_DEFENSE") + DEFENSE: Digit = Field(Digit(type=DigitType.NUMERIC, value=Decimal(0)), alias="FIGHT_PROP_DEFENSE") + DEFENSE_PERCENT: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_DEFENSE_PERCENT") + ELEMENTAL_MASTERY: Digit = Field( + Digit(type=DigitType.NUMERIC, value=Decimal(0)), alias="FIGHT_PROP_ELEMENT_MASTERY" + ) + + CRIT_RATE: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_CRITICAL") + CRIT_DMG: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_CRITICAL_HURT") + HEALING_BONUS: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_HEAL_ADD") + INCOMING_HEALING_BONUS: Digit = Field( + Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_HEALED_ADD" + ) + ENERGY_RECHARGE: Digit = Field( + Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_CHARGE_EFFICIENCY" + ) + CD_REDUCTION: Digit = Field( + Digit(type=DigitType.NUMERIC, value=Decimal(0)), alias="FIGHT_PROP_SKILL_CD_MINUS_RATIO" + ) + SHIELD_STRENGTH: Digit = Field( + Digit(type=DigitType.NUMERIC, value=Decimal(0)), alias="FIGHT_PROP_SHIELD_COST_MINUS_RATIO" + ) + + PYRO_DMG_BONUS: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_FIRE_ADD_HURT") + PYRO_RES: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_FIRE_SUB_HURT") + HYDRO_DMG_BONUS: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_WATER_ADD_HURT") + HYDRO_RES: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_WATER_SUB_HURT") + DENDRO_DMG_BONUS: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_GRASS_ADD_HURT") + DENDRO_RES: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_GRASS_SUB_HURT") + ELECTRO_DMG_BONUS: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_ELEC_ADD_HURT") + ELECTRO_RES: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_ELEC_SUB_HURT") + ANEMO_DMG_BONUS: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_WIND_ADD_HURT") + ANEMO_RES: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_WIND_SUB_HURT") + CRYO_DMG_BONUS: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_ICE_ADD_HURT") + CRYO_RES: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_ICE_SUB_HURT") + GEO_DMG_BONUS: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_ROCK_ADD_HURT") + GEO_RES: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_ROCK_SUB_HURT") + PHYSICAL_DMG_BONUS: Digit = Field( + Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_PHYSICAL_SUB_HURT" + ) + PHYSICAL_RES: Digit = Field(Digit(type=DigitType.PERCENT, value=Decimal(0)), alias="FIGHT_PROP_PHYSICAL_ADD_HURT") + + CURRENT_HP: Digit = Field(Digit(type=DigitType.NUMERIC, value=Decimal(0)), alias="FIGHT_PROP_CUR_HP") + MAX_HP: Digit = Field(Digit(type=DigitType.NUMERIC, value=Decimal(0)), alias="FIGHT_PROP_MAX_HP") + CURRENT_ATTACK: Digit = Field(Digit(type=DigitType.NUMERIC, value=Decimal(0)), alias="FIGHT_PROP_CUR_ATTACK") + CURRENT_DEFENSE: Digit = Field(Digit(type=DigitType.NUMERIC, value=Decimal(0)), alias="FIGHT_PROP_CUR_DEFENSE") + + +class CharacterInfo(BaseModel): + id: int = 0 + character: Character = "" + weapon_info: Optional[WeaponInfo] = None + artifacts: List[Artifact] = [] + level: int = 0 + max_level: int = 0 + constellation: int = 0 + ascension: int = 0 + skills: List[int] = [] + rarity: int = 0 + stats: CharacterStats = CharacterStats() + + @validator("max_level") + def validate_max_level(cls, v, values): + if v == 0: + return values["level"] + if v < values["level"]: + raise ValueError("max_level must be greater than or equal to level") + return v + + @validator("skills") + def validate_skills(cls, v): + if len(v) > 3: + raise ValueError("skills must not be greater than 3") + return v diff --git a/plugins/genshin/model/converters/enka.py b/plugins/genshin/model/converters/enka.py new file mode 100644 index 00000000..1df23a78 --- /dev/null +++ b/plugins/genshin/model/converters/enka.py @@ -0,0 +1,197 @@ +from decimal import Decimal + +from enkanetwork import ( + CharacterInfo as EnkaCharacterInfo, + CharacterStats as EnkaCharacterStats, + StatsPercentage, + Equipments, + EquipmentsType, + EquipType, + EquipmentsStats, + DigitType as EnkaDigitType, +) + +from plugins.genshin.model import ( + CharacterInfo, + Digit, + DigitType, + CharacterStats, + WeaponInfo, + WeaponType, + Artifact, + ArtifactPosition, + ArtifactAttribute, + ArtifactAttributeType, +) +from plugins.genshin.model.metadata import Metadata + +metadata = Metadata() + + +class EnkaConverter: + @classmethod + def to_weapon_type(cls, type_str: str) -> WeaponType: + if type_str == "WEAPON_BOW": + return WeaponType.BOW + if type_str == "WEAPON_CATALYST": + return WeaponType.CATALYST + if type_str == "WEAPON_CLAYMORE": + return WeaponType.CLAYMORE + if type_str == "WEAPON_POLE": + return WeaponType.POLEARM + if type_str == "WEAPON_SWORD_ONE_HAND": + return WeaponType.SWORD + if type_str == "单手剑": + return WeaponType.SWORD + raise ValueError(f"Unknown weapon type: {type_str}") + + @classmethod + def to_weapon_info(cls, equipment: Equipments) -> WeaponInfo: + if equipment.type != EquipmentsType.WEAPON: + raise ValueError(f"Not weapon equipment type: {equipment.type}") + + weapon_data = metadata.weapon_metadata.get(str(equipment.id)) + if not weapon_data: + raise ValueError(f"Unknown weapon id: {equipment.id}") + + return WeaponInfo( + id=equipment.id, + weapon=weapon_data["route"], + type=cls.to_weapon_type(weapon_data["type"]), + level=equipment.level, + max_level=equipment.max_level, + refinement=equipment.refinement, + ascension=equipment.ascension, + ) + + @classmethod + def to_artifact_attribute_type(cls, prop_id: str) -> ArtifactAttributeType: # skipcq: PY-R1000 + if prop_id == "FIGHT_PROP_HP": + return ArtifactAttributeType.HP + if prop_id == "FIGHT_PROP_ATTACK": + return ArtifactAttributeType.ATK + if prop_id == "FIGHT_PROP_DEFENSE": + return ArtifactAttributeType.DEF + if prop_id == "FIGHT_PROP_HP_PERCENT": + return ArtifactAttributeType.HP_PERCENT + if prop_id == "FIGHT_PROP_ATTACK_PERCENT": + return ArtifactAttributeType.ATK_PERCENT + if prop_id == "FIGHT_PROP_DEFENSE_PERCENT": + return ArtifactAttributeType.DEF_PERCENT + if prop_id == "FIGHT_PROP_ELEMENT_MASTERY": + return ArtifactAttributeType.ELEMENTAL_MASTERY + if prop_id == "FIGHT_PROP_CHARGE_EFFICIENCY": + return ArtifactAttributeType.ENERGY_RECHARGE + if prop_id == "FIGHT_PROP_CRITICAL": + return ArtifactAttributeType.CRIT_RATE + if prop_id == "FIGHT_PROP_CRITICAL_HURT": + return ArtifactAttributeType.CRIT_DMG + if prop_id == "FIGHT_PROP_HEAL_ADD": + return ArtifactAttributeType.HEALING_BONUS + if prop_id == "FIGHT_PROP_FIRE_ADD_HURT": + return ArtifactAttributeType.PYRO_DMG_BONUS + if prop_id == "FIGHT_PROP_WATER_ADD_HURT": + return ArtifactAttributeType.HYDRO_DMG_BONUS + if prop_id == "FIGHT_PROP_ELEC_ADD_HURT": + return ArtifactAttributeType.ELECTRO_DMG_BONUS + if prop_id == "FIGHT_PROP_ICE_ADD_HURT": + return ArtifactAttributeType.CRYO_DMG_BONUS + if prop_id == "FIGHT_PROP_WIND_ADD_HURT": + return ArtifactAttributeType.ANEMO_DMG_BONUS + if prop_id == "FIGHT_PROP_ROCK_ADD_HURT": + return ArtifactAttributeType.GEO_DMG_BONUS + if prop_id == "FIGHT_PROP_GRASS_ADD_HURT": + return ArtifactAttributeType.DENDRO_DMG_BONUS + if prop_id == "FIGHT_PROP_PHYSICAL_ADD_HURT": + return ArtifactAttributeType.PHYSICAL_DMG_BONUS + raise ValueError(f"Unknown artifact attribute type: {prop_id}") + + @classmethod + def to_artifact_attribute(cls, equip_stat: EquipmentsStats) -> ArtifactAttribute: + return ArtifactAttribute( + type=cls.to_artifact_attribute_type(equip_stat.prop_id), + digit=Digit( + value=Decimal(equip_stat.value), + type=DigitType.PERCENT if equip_stat.type == EnkaDigitType.PERCENT else DigitType.NUMERIC, + ), + ) + + @classmethod + def to_artifact_position(cls, equip_type: EquipType) -> ArtifactPosition: + if equip_type == EquipType.Flower: + return ArtifactPosition.FLOWER + if equip_type == EquipType.Feather: + return ArtifactPosition.PLUME + if equip_type == EquipType.Sands: + return ArtifactPosition.SANDS + if equip_type == EquipType.Goblet: + return ArtifactPosition.GOBLET + if equip_type == EquipType.Circlet: + return ArtifactPosition.CIRCLET + raise ValueError(f"Unknown artifact position: {equip_type}") + + @classmethod + def to_artifact(cls, equipment: Equipments) -> Artifact: + if equipment.type != EquipmentsType.ARTIFACT: + raise ValueError(f"Not artifact equipment type: {equipment.type}") + + artifact_data = next( + ( + data + for data in metadata.artifacts_metadata.values() + if data["name"] == equipment.detail.artifact_name_set + ), + None, + ) + if not artifact_data: + raise ValueError(f"Unknown artifact: {equipment}") + + return Artifact( + id=artifact_data["id"], + set=artifact_data["route"], + position=cls.to_artifact_position(equipment.detail.artifact_type), + level=equipment.level, + rarity=equipment.detail.rarity, + main_attribute=cls.to_artifact_attribute(equipment.detail.mainstats), + sub_attributes=[cls.to_artifact_attribute(stat) for stat in equipment.detail.substats], + ) + + @classmethod + def to_character_stats(cls, character_stats: EnkaCharacterStats) -> CharacterStats: + return CharacterStats( + **{ + stat: Digit( + value=Decimal(value.value), + type=DigitType.PERCENT if isinstance(value, StatsPercentage) else DigitType.NUMERIC, + ) + for stat, value in character_stats._iter() # pylint: disable=W0212 + } + ) + + @classmethod + def to_character(cls, character_info: EnkaCharacterInfo) -> str: + character_id = str(character_info.id) + if character_id in ("10000005", "10000007"): + character_id += f"-{character_info.element.name.lower()}" + character_data = metadata.characters_metadata.get(character_id) + if not character_data: + raise ValueError(f"Unknown character: {character_info.name}\n{character_info}") + return character_data["route"] + + @classmethod + def to_character_info(cls, character_info: EnkaCharacterInfo) -> CharacterInfo: + weapon_equip = next((equip for equip in character_info.equipments if equip.type == EquipmentsType.WEAPON), None) + artifacts_equip = [equip for equip in character_info.equipments if equip.type == EquipmentsType.ARTIFACT] + return CharacterInfo( + id=character_info.id, + character=cls.to_character(character_info), + rarity=character_info.rarity, + weapon_info=cls.to_weapon_info(weapon_equip) if weapon_equip else None, + artifacts=[cls.to_artifact(equip) for equip in artifacts_equip], + level=character_info.level, + max_level=character_info.max_level, + ascension=character_info.ascension, + constellation=character_info.constellations_unlocked, + skills=[skill.level for skill in character_info.skills], + stats=cls.to_character_stats(character_info.stats), + ) diff --git a/plugins/genshin/model/converters/gcsim.py b/plugins/genshin/model/converters/gcsim.py new file mode 100644 index 00000000..dfd4b166 --- /dev/null +++ b/plugins/genshin/model/converters/gcsim.py @@ -0,0 +1,405 @@ +import re +from collections import Counter +from decimal import Decimal +from functools import lru_cache +from typing import List, Optional, Tuple, Dict + +from gcsim_pypi.aliases import CHARACTER_ALIASES, WEAPON_ALIASES, ARTIFACT_ALIASES +from pydantic import ValidationError + +from plugins.genshin.model import ( + Set, + Weapon, + DigitType, + WeaponInfo, + Artifact, + ArtifactAttributeType, + Character, + CharacterInfo, + GCSim, + GCSimTarget, + GCSimWeapon, + GCSimWeaponInfo, + GCSimSet, + GCSimSetInfo, + GCSimCharacter, + GCSimEnergySettings, + GCSimCharacterInfo, + GCSimCharacterStats, +) +from plugins.genshin.model.metadata import Metadata +from utils.log import logger + +metadata = Metadata() + + +def remove_non_words(text: str) -> str: + return text.replace("'", "").replace('"', "").replace("-", "").replace(" ", "") + + +def from_character_gcsim_character(character: Character) -> GCSimCharacter: + if character == "Raiden Shogun": + return GCSimCharacter("raiden") + if character == "Yae Miko": + return GCSimCharacter("yaemiko") + if character == "Hu Tao": + return GCSimCharacter("hutao") + if character == "Yun Jin": + return GCSimCharacter("yunjin") + if character == "Kuki Shinobu": + return GCSimCharacter("kuki") + if "Traveler" in character: + s = character.split(" ") + traveler_name = "aether" if s[-1] == "Boy" else "lumine" + return GCSimCharacter(f"{traveler_name}{s[0].lower()}") + return GCSimCharacter(character.split(" ")[-1].lower()) + + +GCSIM_CHARACTER_TO_CHARACTER: Dict[GCSimCharacter, Tuple[int, Character]] = {} +for char in metadata.characters_metadata.values(): + GCSIM_CHARACTER_TO_CHARACTER[from_character_gcsim_character(char["route"])] = (char["id"], char["route"]) +for alias, char in CHARACTER_ALIASES.items(): + if alias not in GCSIM_CHARACTER_TO_CHARACTER: + if char in GCSIM_CHARACTER_TO_CHARACTER: + GCSIM_CHARACTER_TO_CHARACTER[alias] = GCSIM_CHARACTER_TO_CHARACTER[char] + elif alias.startswith("traveler") or alias.startswith("aether") or alias.startswith("lumine"): + continue + else: + logger.warning("Character alias %s not found in GCSIM", alias) + +GCSIM_WEAPON_TO_WEAPON: Dict[GCSimWeapon, Tuple[int, Weapon]] = {} +for _weapon in metadata.weapon_metadata.values(): + GCSIM_WEAPON_TO_WEAPON[remove_non_words(_weapon["route"].lower())] = (_weapon["id"], _weapon["route"]) +for alias, _weapon in WEAPON_ALIASES.items(): + if alias not in GCSIM_WEAPON_TO_WEAPON: + if _weapon in GCSIM_WEAPON_TO_WEAPON: + GCSIM_WEAPON_TO_WEAPON[alias] = GCSIM_WEAPON_TO_WEAPON[_weapon] + else: + logger.warning("Weapon alias %s not found in GCSIM", alias) + +GCSIM_ARTIFACT_TO_ARTIFACT: Dict[GCSimSet, Tuple[int, Set]] = {} +for _artifact in metadata.artifacts_metadata.values(): + GCSIM_ARTIFACT_TO_ARTIFACT[remove_non_words(_artifact["route"].lower())] = (_artifact["id"], _artifact["route"]) +for alias, _artifact in ARTIFACT_ALIASES.items(): + if alias not in GCSIM_ARTIFACT_TO_ARTIFACT: + if _artifact in GCSIM_ARTIFACT_TO_ARTIFACT: + GCSIM_ARTIFACT_TO_ARTIFACT[alias] = GCSIM_ARTIFACT_TO_ARTIFACT[_artifact] + else: + logger.warning("Artifact alias %s not found in GCSIM", alias) + + +class GCSimConverter: + literal_keys_numeric_values_regex = re.compile( + r"([\w_%]+)=(\d+ *, *\d+ *, *\d+|[\d*\.*\d+]+ *, *[\d*\.*\d+]+|\d+/\d+|\d*\.*\d+|\d+)" + ) + + @classmethod + def to_character(cls, character: GCSimCharacter) -> Tuple[int, Character]: + return GCSIM_CHARACTER_TO_CHARACTER[character] + + @classmethod + def from_character(cls, character: Character) -> GCSimCharacter: + return from_character_gcsim_character(character) + + @classmethod + def to_weapon(cls, weapon: GCSimWeapon) -> Tuple[int, Weapon]: + return GCSIM_WEAPON_TO_WEAPON[weapon] + + @classmethod + def from_weapon(cls, weapon: Weapon) -> GCSimWeapon: + return GCSimWeapon(remove_non_words(weapon).lower()) + + @classmethod + def from_weapon_info(cls, weapon_info: Optional[WeaponInfo]) -> GCSimWeaponInfo: + if weapon_info is None: + return GCSimWeaponInfo(weapon=GCSimWeapon("dullblade"), refinement=1, level=1, max_level=20) + return GCSimWeaponInfo( + weapon=cls.from_weapon(weapon_info.weapon), + refinement=weapon_info.refinement, + level=weapon_info.level, + max_level=weapon_info.max_level, + ) + + @classmethod + def to_set(cls, set_name: GCSimSet) -> Tuple[int, Set]: + return GCSIM_ARTIFACT_TO_ARTIFACT[set_name] + + @classmethod + def from_set(cls, set_name: Set) -> GCSimSet: + return GCSimSet(remove_non_words(set_name).lower()) + + @classmethod + def from_artifacts(cls, artifacts: List[Artifact]) -> List[GCSimSetInfo]: + c = Counter() + for art in artifacts: + c[cls.from_set(art.set)] += 1 + return [GCSimSetInfo(set=set_name, count=count) for set_name, count in c.items()] + + @classmethod + @lru_cache + def from_attribute_type(cls, attribute_type: ArtifactAttributeType) -> str: # skipcq: PY-R1000 + if attribute_type == ArtifactAttributeType.HP: + return "HP" + if attribute_type == ArtifactAttributeType.HP_PERCENT: + return "HP_PERCENT" + if attribute_type == ArtifactAttributeType.ATK: + return "ATK" + if attribute_type == ArtifactAttributeType.ATK_PERCENT: + return "ATK_PERCENT" + if attribute_type == ArtifactAttributeType.DEF: + return "DEF" + if attribute_type == ArtifactAttributeType.DEF_PERCENT: + return "DEF_PERCENT" + if attribute_type == ArtifactAttributeType.ELEMENTAL_MASTERY: + return "EM" + if attribute_type == ArtifactAttributeType.ENERGY_RECHARGE: + return "ER" + if attribute_type == ArtifactAttributeType.CRIT_RATE: + return "CR" + if attribute_type == ArtifactAttributeType.CRIT_DMG: + return "CD" + if attribute_type == ArtifactAttributeType.HEALING_BONUS: + return "HEAL" + if attribute_type == ArtifactAttributeType.PYRO_DMG_BONUS: + return "PYRO_PERCENT" + if attribute_type == ArtifactAttributeType.HYDRO_DMG_BONUS: + return "HYDRO_PERCENT" + if attribute_type == ArtifactAttributeType.DENDRO_DMG_BONUS: + return "DENDRO_PERCENT" + if attribute_type == ArtifactAttributeType.ELECTRO_DMG_BONUS: + return "ELECTRO_PERCENT" + if attribute_type == ArtifactAttributeType.ANEMO_DMG_BONUS: + return "ANEMO_PERCENT" + if attribute_type == ArtifactAttributeType.CRYO_DMG_BONUS: + return "CRYO_PERCENT" + if attribute_type == ArtifactAttributeType.GEO_DMG_BONUS: + return "GEO_PERCENT" + if attribute_type == ArtifactAttributeType.PHYSICAL_DMG_BONUS: + return "PHYS_PERCENT" + raise ValueError(f"Unknown attribute type: {attribute_type}") + + @classmethod + def from_artifacts_stats(cls, artifacts: List[Artifact]) -> GCSimCharacterStats: + gcsim_stats = GCSimCharacterStats() + for art in artifacts: + main_attr_name = cls.from_attribute_type(art.main_attribute.type) + setattr( + gcsim_stats, + main_attr_name, + getattr(gcsim_stats, main_attr_name) + + ( + Decimal(art.main_attribute.digit.value) / Decimal(100) + if art.main_attribute.digit.type == DigitType.PERCENT + else Decimal(art.main_attribute.digit.value) + ), + ) + for sub_attr in art.sub_attributes: + attr_name = cls.from_attribute_type(sub_attr.type) + setattr( + gcsim_stats, + attr_name, + getattr(gcsim_stats, attr_name) + + ( + Decimal(sub_attr.digit.value) / Decimal(100) + if sub_attr.digit.type == DigitType.PERCENT + else Decimal(sub_attr.digit.value) + ), + ) + return gcsim_stats + + @classmethod + def from_character_info(cls, character: CharacterInfo) -> GCSimCharacterInfo: + return GCSimCharacterInfo( + character=cls.from_character(character.character), + level=character.level, + max_level=character.max_level, + constellation=character.constellation, + talent=character.skills, + weapon_info=cls.from_weapon_info(character.weapon_info), + set_info=cls.from_artifacts(character.artifacts), + # NOTE: Only stats from arifacts are needed + stats=cls.from_artifacts_stats(character.artifacts), + ) + + @classmethod + def merge_character_infos(cls, gcsim: GCSim, character_infos: List[CharacterInfo]) -> GCSim: + gcsim_characters = {ch.character: ch for ch in gcsim.characters} + for character_info in character_infos: + try: + gcsim_character = cls.from_character_info(character_info) + if gcsim_character.character in gcsim_characters: + gcsim_characters[gcsim_character.character] = gcsim_character + except ValidationError as e: + errors = e.errors() + if errors and errors[0].get("msg").startswith("Not supported"): + # Something is not supported, skip + continue + logger.warning("Failed to convert character info: %s", character_info) + gcsim.characters = list(gcsim_characters.values()) + return gcsim + + @classmethod + def prepend_scripts(cls, gcsim: GCSim, scripts: List[str]) -> GCSim: + gcsim.scripts = scripts + gcsim.scripts + return gcsim + + @classmethod + def append_scripts(cls, gcsim: GCSim, scripts: List[str]) -> GCSim: + gcsim.scripts = gcsim.scripts + scripts + return gcsim + + @classmethod + def from_gcsim_energy(cls, line: str) -> GCSimEnergySettings: + energy_settings = GCSimEnergySettings() + matches = cls.literal_keys_numeric_values_regex.findall(line) + for key, value in matches: + if key == "interval": + energy_settings.intervals = list(map(int, value.split(","))) + elif key == "amount": + energy_settings.amount = int(value) + else: + logger.warning("Unknown energy setting: %s=%s", key, value) + return energy_settings + + @classmethod + def from_gcsim_target(cls, line: str) -> GCSimTarget: + target = GCSimTarget() + matches = cls.literal_keys_numeric_values_regex.findall(line) + for key, value in matches: + if key == "lvl": + target.level = int(value) + elif key == "hp": + target.hp = int(value) + elif key == "amount": + target.amount = int(value) + elif key == "resist": + target.resist = float(value) + elif key == "pos": + target.position = tuple(p for p in value.split(",")) + elif key == "interval": + target.interval = list(map(int, value.split(","))) + elif key == "radius": + target.radius = float(value) + elif key == "particle_threshold": + target.particle_threshold = int(value) + elif key == "particle_drop_count": + target.particle_drop_count = int(value) + elif key in ("pyro", "hydro", "dendro", "electro", "anemo", "cryo", "geo", "physical"): + target.others[key] = float(value) + else: + logger.warning("Unknown target setting: %s=%s", key, value) + return target + + @classmethod + def from_gcsim_char_line(cls, line: str, character: GCSimCharacterInfo) -> GCSimCharacterInfo: + matches = cls.literal_keys_numeric_values_regex.findall(line) + for key, value in matches: + if key == "lvl": + character.level, character.max_level = map(int, value.split("/")) + elif key == "cons": + character.constellation = int(value) + elif key == "talent": + character.talent = list(map(int, value.split(","))) + elif key == "start_hp": + character.start_hp = int(value) + elif key == "breakthrough": + character.params.append(f"{key}={value}") + else: + logger.warning("Unknown character setting: %s=%s", key, value) + return character + + @classmethod + def from_gcsim_weapon_line(cls, line: str, weapon_info: GCSimWeaponInfo) -> GCSimWeaponInfo: + weapon_name = re.search(r"weapon= *\"(.*)\"", line).group(1) + if weapon_name not in WEAPON_ALIASES: + raise ValueError(f"Unknown weapon: {weapon_name}") + weapon_info.weapon = WEAPON_ALIASES[weapon_name] + + for key, value in cls.literal_keys_numeric_values_regex.findall(line): + if key == "refine": + weapon_info.refinement = int(value) + elif key == "lvl": + weapon_info.level, weapon_info.max_level = map(int, value.split("/")) + elif key.startswith("stack"): + weapon_info.params.append(f"stacks={value}") + elif key in ("pickup_delay", "breakthrough"): + weapon_info.params.append(f"{key}={value}") + else: + logger.warning("Unknown weapon setting: %s=%s", key, value) + return weapon_info + + @classmethod + def from_gcsim_set_line(cls, line: str) -> GCSimSetInfo: + gcsim_set = re.search(r"set= *\"(.*)\"", line).group(1) + if gcsim_set not in ARTIFACT_ALIASES: + raise ValueError(f"Unknown set: {gcsim_set}") + gcsim_set = ARTIFACT_ALIASES[gcsim_set] + set_info = GCSimSetInfo(set=gcsim_set) + + for key, value in cls.literal_keys_numeric_values_regex.findall(line): + if key == "count": + set_info.count = int(value) + elif key.startswith("stack"): + set_info.params.append(f"stacks={value}") + else: + logger.warning("Unknown set info: %s=%s", key, value) + return set_info + + @classmethod + def from_gcsim_stats_line(cls, line: str, stats: GCSimCharacterStats) -> GCSimCharacterStats: + matches = re.findall(r"(\w+%?)=(\d*\.*\d+)", line) + for stat, value in matches: + attr = stat.replace("%", "_percent").upper() + setattr(stats, attr, getattr(stats, attr) + Decimal(value)) + return stats + + @classmethod + def from_gcsim_script(cls, script: str) -> GCSim: # skipcq: PY-R1000 + options = "" + characters = {} + character_aliases = {} + active_character = None + targets = [] + energy_settings = GCSimEnergySettings() + script_lines = [] + for line in script.strip().split("\n"): + line = line.split("#")[0].strip() + if not line or line.startswith("#"): + continue + if line.startswith("options"): + options = line.strip(";") + elif line.startswith("target"): + targets.append(cls.from_gcsim_target(line)) + elif line.startswith("energy"): + energy_settings = cls.from_gcsim_energy(line) + elif line.startswith("active"): + active_character = line.strip(";").split(" ")[1] + elif m := re.match(r"(\w+) +(char|add weapon|add set|add stats)\W", line): + if m.group(1) not in CHARACTER_ALIASES: + raise ValueError(f"Unknown character: {m.group(1)}") + c = CHARACTER_ALIASES[m.group(1)] + if c not in characters: + characters[c] = GCSimCharacterInfo(character=c) + if m.group(1) != c: + character_aliases[m.group(1)] = c + if m.group(2) == "char": + characters[c] = cls.from_gcsim_char_line(line, characters[c]) + elif m.group(2) == "add weapon": + characters[c].weapon_info = cls.from_gcsim_weapon_line(line, characters[c].weapon_info) + elif m.group(2) == "add set": + characters[c].set_info.append(cls.from_gcsim_set_line(line)) + elif m.group(2) == "add stats": + characters[c].stats = cls.from_gcsim_stats_line(line, characters[c].stats) + else: + for key, value in character_aliases.items(): + line = line.replace(f"{key} ", f"{value} ") + line = line.replace(f".{key}.", f".{value}.") + script_lines.append(line) + return GCSim( + options=options, + characters=list(characters.values()), + targets=targets, + energy_settings=energy_settings, + active_character=active_character, + script_lines=script_lines, + ) diff --git a/plugins/genshin/model/gcsim.py b/plugins/genshin/model/gcsim.py new file mode 100644 index 00000000..efe1fc2e --- /dev/null +++ b/plugins/genshin/model/gcsim.py @@ -0,0 +1,233 @@ +from decimal import Decimal +from typing import Any, NewType, List, Optional, Tuple, Dict + +from gcsim_pypi.aliases import ARTIFACT_ALIASES, CHARACTER_ALIASES, WEAPON_ALIASES +from gcsim_pypi.availability import AVAILABLE_ARTIFACTS, AVAILABLE_CHARACTERS, AVAILABLE_WEAPONS +from pydantic import BaseModel, validator + +GCSimCharacter = NewType("GCSimCharacter", str) +GCSimWeapon = NewType("GCSimWeapon", str) +GCSimSet = NewType("GCSimSet", str) + + +class GCSimWeaponInfo(BaseModel): + weapon: GCSimWeapon = "dullblade" + refinement: int = 1 + level: int = 1 + max_level: int = 20 + params: List[str] = [] + + @validator("weapon") + def validate_weapon(cls, v): + if v not in AVAILABLE_WEAPONS or v not in WEAPON_ALIASES: + raise ValueError(f"Not supported weapon: {v}") + return WEAPON_ALIASES[v] + + +class GCSimSetInfo(BaseModel): + set: GCSimSet + count: int = 2 + params: List[str] = [] + + @validator("set") + def validate_set(cls, v): + if v not in AVAILABLE_ARTIFACTS or v not in ARTIFACT_ALIASES: + raise ValueError(f"Not supported set: {v}") + return ARTIFACT_ALIASES[v] + + +class GCSimCharacterStats(BaseModel): + HP: Decimal = Decimal(0) + HP_PERCENT: Decimal = Decimal(0) + ATK: Decimal = Decimal(0) + ATK_PERCENT: Decimal = Decimal(0) + DEF: Decimal = Decimal(0) + DEF_PERCENT: Decimal = Decimal(0) + EM: Decimal = Decimal(0) + ER: Decimal = Decimal(0) + CR: Decimal = Decimal(0) + CD: Decimal = Decimal(0) + HEAL: Decimal = Decimal(0) + PYRO_PERCENT: Decimal = Decimal(0) + HYDRO_PERCENT: Decimal = Decimal(0) + DENDRO_PERCENT: Decimal = Decimal(0) + ELECTRO_PERCENT: Decimal = Decimal(0) + ANEMO_PERCENT: Decimal = Decimal(0) + CRYO_PERCENT: Decimal = Decimal(0) + GEO_PERCENT: Decimal = Decimal(0) + PHYS_PERCENT: Decimal = Decimal(0) + + +class GCSimCharacterInfo(BaseModel): + character: GCSimCharacter + level: int = 1 + max_level: int = 20 + constellation: int = 0 + talent: List[int] = [1, 1, 1] + start_hp: Optional[int] = None + weapon_info: GCSimWeaponInfo = GCSimWeaponInfo() + set_info: List[GCSimSetInfo] = [] + stats: GCSimCharacterStats = GCSimCharacterStats() + params: List[str] = [] + + @validator("character") + def validate_character(cls, v): + if v not in AVAILABLE_CHARACTERS or v not in CHARACTER_ALIASES: + raise ValueError(f"Not supported character: {v}") + return CHARACTER_ALIASES[v] + + @property + def char(self) -> str: + return self.character + + @property + def char_line(self) -> str: + return ( + " ".join( + filter( + lambda w: w, + [ + f"{self.char}", + "char", + f"lvl={self.level}/{self.max_level}", + f"cons={self.constellation}", + f"start_hp={self.start_hp}" if self.start_hp is not None else "", + f"talent={','.join(str(t) for t in self.talent)}", + f"+params=[{','.join(self.params)}] " if self.params else "", + ], + ) + ) + + ";" + ) + + @property + def weapon_line(self) -> str: + return ( + " ".join( + filter( + lambda w: w, + [ + f"{self.char}", + f'add weapon="{self.weapon_info.weapon}"', + f"refine={self.weapon_info.refinement}", + f"lvl={self.weapon_info.level}/{self.weapon_info.max_level}", + f"+params=[{','.join(self.weapon_info.params)}] " if self.weapon_info.params else "", + ], + ) + ) + + ";" + ) + + @property + def set_line(self) -> str: + return "\n".join( + " ".join( + filter( + lambda w: w, + [ + f"{self.char}", + f'add set="{set_info.set}"', + f"count={4 if set_info.count >= 4 else 2}", + f"+params=[{','.join(set_info.params)}] " if set_info.params else "", + ], + ) + ) + + ";" + for set_info in self.set_info + # NOTE: 祭*系列似乎并不支持 + if set_info.count > 1 + ) + + @property + def stats_line(self) -> str: + if all(value == 0 for _, value in self.stats): + return "" + return ( + f"{self.char} add stats " + + " ".join( + [ + f"{stat.replace('_PERCENT', '%').lower()}={value:.4f}" + if stat.endswith("_PERCENT") or stat in {"CR", "CD", "ER"} + else f"{stat.lower()}={value:.2f}" + for stat, value in iter(self.stats) + if value > 0 + ] + ) + + ";" + ) + + def __str__(self) -> str: + return "\n".join([self.char_line, self.weapon_line, self.set_line, self.stats_line]) + + +class GCSimTarget(BaseModel): + level: int = 100 + resist: float = 0.1 + position: Tuple[str, str] = ("0", "0") + interval: List[int] = [] + radius: Optional[float] = None + hp: Optional[int] = None + amount: Optional[int] = None + particle_threshold: Optional[int] = None + particle_drop_count: Optional[int] = None + others: Dict[str, Any] = {} + + def __str__(self) -> str: + return ( + " ".join( + filter( + lambda w: w, + [ + f"target lvl={self.level} resist={self.resist} ", + f"pos={','.join(self.position)}", + f"radius={self.radius}" if self.radius is not None else "", + f"hp={self.hp}" if self.hp is not None else "", + f"amount={self.amount}" if self.amount is not None else "", + f"interval={','.join(str(i) for i in self.interval)}" if self.interval else "", + f"particle_threshold={self.particle_threshold}" if self.particle_threshold is not None else "", + f"particle_drop_count={self.particle_drop_count}" + if self.particle_drop_count is not None + else "", + " ".join([f"{k}={v}" for k, v in self.others.items()]), + ], + ) + ) + + ";" + ) + + +class GCSimEnergySettings(BaseModel): + intervals: List[int] = [480, 720] + amount: int = 1 + + def __str__(self) -> str: + return f"energy every interval={','.join(str(i) for i in self.intervals)} amount={self.amount};" + + +class GCSim(BaseModel): + options: Optional[str] = None + characters: List[GCSimCharacterInfo] = [] + targets: List[GCSimTarget] = [GCSimTarget()] + energy_settings: Optional[GCSimEnergySettings] = None + # TODO: Do we even want this? + hurt_settings: Optional[str] = None + active_character: Optional[GCSimCharacter] = None + script_lines: List[str] = [] + + def __str__(self) -> str: + line = "" + if self.options: + line += f"{self.options};\n" + line += "\n".join([str(c) for c in self.characters]) + line += "\n" + line += "\n".join([str(t) for t in self.targets]) + line += "\n" + if self.energy_settings: + line += f"{self.energy_settings}\n" + if self.active_character: + line += f"active {self.active_character};\n" + else: + line += f"active {self.characters[0].char};\n" + line += "\n".join(self.script_lines) + line += "\n" + return line diff --git a/plugins/genshin/model/metadata.py b/plugins/genshin/model/metadata.py new file mode 100644 index 00000000..cdde2d8b --- /dev/null +++ b/plugins/genshin/model/metadata.py @@ -0,0 +1,31 @@ +from typing import Dict, Any + +from utils.const import PROJECT_ROOT + +try: + import ujson as jsonlib +except ImportError: + import json as jsonlib + +METADATA_PATH = PROJECT_ROOT.joinpath("metadata").joinpath("data") + + +class Metadata: + _instance: "Metadata" = None + weapon_metadata: Dict[str, Any] = {} + artifacts_metadata: Dict[str, Any] = {} + characters_metadata: Dict[str, Any] = {} + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance.reload_assets() + return cls._instance + + def reload_assets(self) -> None: + self.__load_assets_data() + + def __load_assets_data(self) -> None: + self.weapon_metadata = jsonlib.loads(METADATA_PATH.joinpath("weapon.json").read_text(encoding="utf-8")) + self.artifacts_metadata = jsonlib.loads(METADATA_PATH.joinpath("reliquary.json").read_text(encoding="utf-8")) + self.characters_metadata = jsonlib.loads(METADATA_PATH.joinpath("avatar.json").read_text(encoding="utf-8")) diff --git a/plugins/genshin/player_cards.py b/plugins/genshin/player_cards.py index b68d554f..067e633e 100644 --- a/plugins/genshin/player_cards.py +++ b/plugins/genshin/player_cards.py @@ -32,6 +32,7 @@ from core.services.players import PlayersService from core.services.template.services import TemplateService from metadata.shortname import roleToName, idToName from modules.apihelper.client.components.remote import Remote +from modules.gcsim.file import PlayerGCSimScripts from modules.playercards.file import PlayerCardsFile from modules.playercards.helpers import ArtifactStatsTheory from utils.enkanetwork import RedisCache, EnkaNetworkAPI @@ -78,6 +79,7 @@ class PlayerCards(Plugin): self.client = EnkaNetworkAPI(lang="chs", user_agent=config.enka_network_api_agent, cache=False) self.cache = RedisCache(redis.client, key="plugin:player_cards:enka_network", ex=60) self.player_cards_file = PlayerCardsFile() + self.player_gcsim_scripts = PlayerGCSimScripts() self.assets_service = assets_service self.template_service = template_service self.kitsune: Optional[str] = None @@ -210,15 +212,7 @@ class PlayerCards(Plugin): self.kitsune = reply_message.photo[-1].file_id return enka_response = EnkaNetworkResponse.parse_obj(copy.deepcopy(original_data)) - if character_name is not None: - logger.info( - "用户 %s[%s] 角色卡片查询命令请求 || character_name[%s] uid[%s]", - user.full_name, - user.id, - character_name, - uid, - ) - else: + if character_name is None: logger.info("用户 %s[%s] 角色卡片查询命令请求", user.full_name, user.id) ttl = await self.cache.ttl(uid) if enka_response.characters is None or len(enka_response.characters) == 0: @@ -244,6 +238,14 @@ class PlayerCards(Plugin): if reply_message.photo: self.kitsune = reply_message.photo[-1].file_id return + + logger.info( + "用户 %s[%s] 角色卡片查询命令请求 || character_name[%s] uid[%s]", + user.full_name, + user.id, + character_name, + uid, + ) for characters in enka_response.characters: if characters.name == character_name: break @@ -300,6 +302,7 @@ class PlayerCards(Plugin): await callback_query.answer("请先将角色加入到角色展柜并允许查看角色详情后再使用此功能,如果已经添加了角色,请等待角色数据更新后重试", show_alert=True) await message.delete() return + self.player_gcsim_scripts.remove_fits(uid) await callback_query.answer(text="正在从 Enka.Network 获取角色列表 请不要重复点击按钮") buttons = self.gen_button(data, user.id, uid, update_button=False) render_data = await self.parse_holder_data(data) diff --git a/pyproject.toml b/pyproject.toml index edcc8121..7f0fa35e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ pillow = "^10.0.1" playwright = "^1.28.0" aiosqlite = { extras = ["sqlite"], version = "^0.19.0" } simnet = { git = "https://github.com/PaiGramTeam/SIMNet" } +gcsim-pypi = "^2.8.2.10" [tool.poetry.extras] pyro = ["Pyrogram", "TgCrypto"] diff --git a/resources/bot/help/help.jinja2 b/resources/bot/help/help.jinja2 index 1917687e..798f892a 100644 --- a/resources/bot/help/help.jinja2 +++ b/resources/bot/help/help.jinja2 @@ -60,6 +60,13 @@
=a)}}for(var h=this.__startIndex;h t.unconstrainedWidth?null:d:null;i.setStyle("width",f)}var g=i.getBoundingRect();o.width=g.width;var y=(i.style.margin||0)+2.1;o.height=g.height+y,o.y-=(o.height-c)/2}}}function _M(t){return"center"===t.position}function bM(t){var e,n,i=t.getData(),r=[],o=!1,a=(t.get("minShowLabelAngle")||0)*vM,s=i.getLayout("viewRect"),l=i.getLayout("r"),u=s.width,h=s.x,c=s.y,p=s.height;function d(t){t.ignore=!0}i.each((function(t){var s=i.getItemGraphicEl(t),c=s.shape,p=s.getTextContent(),f=s.getTextGuideLine(),g=i.getItemModel(t),y=g.getModel("label"),v=y.get("position")||g.get(["emphasis","label","position"]),m=y.get("distanceToLabelLine"),x=y.get("alignTo"),_=Ur(y.get("edgeDistance"),u),b=y.get("bleedMargin"),w=g.getModel("labelLine"),S=w.get("length");S=Ur(S,u);var M=w.get("length2");if(M=Ur(M,u),Math.abs(c.endAngle-c.startAngle)0?"right":"left":k>0?"left":"right"}var B=Math.PI,F=0,G=y.get("rotate");if(j(G))F=G*(B/180);else if("center"===v)F=0;else if("radial"===G||!0===G){F=k<0?-A+B:-A}else if("tangential"===G&&"outside"!==v&&"outer"!==v){var W=Math.atan2(k,L);W<0&&(W=2*B+W),L>0&&(W=B+W),F=W-B}if(o=!!F,p.x=I,p.y=T,p.rotation=F,p.setStyle({verticalAlign:"middle"}),P){p.setStyle({align:D});var H=p.states.select;H&&(H.x+=p.x,H.y+=p.y)}else{var Y=p.getBoundingRect().clone();Y.applyTransform(p.getComputedTransform());var X=(p.style.margin||0)+2.1;Y.y-=X/2,Y.height+=X,r.push({label:p,labelLine:f,position:v,len:S,len2:M,minTurnAngle:w.get("minTurnAngle"),maxSurfaceAngle:w.get("maxSurfaceAngle"),surfaceNormal:new De(k,L),linePoints:C,textAlign:D,labelDistance:m,labelAlignTo:x,edgeDistance:_,bleedMargin:b,rect:Y,unconstrainedWidth:Y.width,labelStyleWidth:p.style.width})}s.setTextConfig({inside:P})}})),!o&&t.get("avoidLabelOverlap")&&function(t,e,n,i,r,o,a,s){for(var l=[],u=[],h=Number.MAX_VALUE,c=-Number.MAX_VALUE,p=0;p i&&(i=e);var o=i%2?i+2:i+3;r=[];for(var a=0;a5)return;var i=this._model.coordinateSystem.getSlidedAxisExpandWindow([t.offsetX,t.offsetY]);"none"!==i.behavior&&this._dispatchExpand({axisExpandWindow:i.axisExpandWindow})}this._mouseDownPoint=null},mousemove:function(t){if(!this._mouseDownPoint&&Mk(this,"mousemove")){var e=this._model,n=e.coordinateSystem.getSlidedAxisExpandWindow([t.offsetX,t.offsetY]),i=n.behavior;"jump"===i&&this._throttledDispatchExpand.debounceNextCall(e.get("axisExpandDebounce")),this._throttledDispatchExpand("none"===i?null:{axisExpandWindow:n.axisExpandWindow,animation:"jump"===i?null:{duration:0}})}}};function Mk(t,e){var n=t._model;return n.get("axisExpandable")&&n.get("axisExpandTriggerOn")===e}var Ik=function(t){function e(){var n=null!==t&&t.apply(this,arguments)||this;return n.type=e.type,n}return n(e,t),e.prototype.init=function(){t.prototype.init.apply(this,arguments),this.mergeOption({})},e.prototype.mergeOption=function(t){var e=this.option;t&&C(e,t,!0),this._initDimensions()},e.prototype.contains=function(t,e){var n=t.get("parallelIndex");return null!=n&&e.getComponent("parallel",n)===this},e.prototype.setAxisExpand=function(t){E(["axisExpandable","axisExpandCenter","axisExpandCount","axisExpandWidth","axisExpandWindow"],(function(e){t.hasOwnProperty(e)&&(this.option[e]=t[e])}),this)},e.prototype._initDimensions=function(){var t=this.dimensions=[],e=this.parallelAxisIndex=[];E(B(this.ecModel.queryComponents({mainType:"parallelAxis"}),(function(t){return(t.get("parallelIndex")||0)===this.componentIndex}),this),(function(n){t.push("dim"+n.get("dim")),e.push(n.componentIndex)}))},e.type="parallel",e.dependencies=["parallelAxis"],e.layoutMode="box",e.defaultOption={z:0,left:80,top:60,right:80,bottom:60,layout:"horizontal",axisExpandable:!1,axisExpandCenter:null,axisExpandCount:0,axisExpandWidth:50,axisExpandRate:17,axisExpandDebounce:50,axisExpandSlideTriggerArea:[-.15,.05,.4],axisExpandTriggerOn:"click",parallelAxisDefault:null},e}(Rp),Tk=function(t){function e(e,n,i,r,o){var a=t.call(this,e,n,i)||this;return a.type=r||"value",a.axisIndex=o,a}return n(e,t),e.prototype.isHorizontal=function(){return"horizontal"!==this.coordinateSystem.getModel().get("layout")},e}(nb);function Ck(t,e,n,i,r,o){t=t||0;var a=n[1]-n[0];if(null!=r&&(r=Ak(r,[0,a])),null!=o&&(o=Math.max(o,null!=r?r:0)),"all"===i){var s=Math.abs(e[1]-e[0]);s=Ak(s,[0,a]),r=o=Ak(s,[r,o]),i=0}e[0]=Ak(e[0],n),e[1]=Ak(e[1],n);var l=Dk(e,i);e[i]+=t;var u,h=r||0,c=n.slice();return l.sign<0?c[0]+=h:c[1]-=h,e[i]=Ak(e[i],c),u=Dk(e,i),null!=r&&(u.sign!==l.sign||u.span