diff --git a/modules/apihelper/client/components/remote.py b/modules/apihelper/client/components/remote.py index 76ce43da..c307f274 100644 --- a/modules/apihelper/client/components/remote.py +++ b/modules/apihelper/client/components/remote.py @@ -1,4 +1,4 @@ -from typing import List, Dict +from typing import List, Dict, Any from httpx import AsyncClient @@ -14,6 +14,7 @@ class Remote: BIRTHDAY = f"{BASE_URL}birthday.json" MATERIAL = f"{BASE_URL}roles_material.json" RULE = f"{RESOURCE_FightPropRule_URL}FightPropRule_genshin.json" + DAMAGE = f"{RESOURCE_FightPropRule_URL}GenshinDamageRule.json" @staticmethod async def get_remote_calendar() -> Dict[str, Dict]: @@ -24,7 +25,7 @@ class Remote: if req.status_code == 200: return req.json() return {} - except Exception as exc: + except Exception as exc: # skipcq: PYL-W0703 logger.error("获取云端日历失败: %s", exc_info=exc) return {} @@ -37,7 +38,7 @@ class Remote: if req.status_code == 200: return req.json() return {} - except Exception as exc: + except Exception as exc: # skipcq: PYL-W0703 logger.error("获取云端生日失败: %s", exc_info=exc) return {} @@ -50,7 +51,7 @@ class Remote: if req.status_code == 200: return req.json() return {} - except Exception as exc: + except Exception as exc: # skipcq: PYL-W0703 logger.error("获取云端角色材料失败: %s", exc_info=exc) return {} @@ -63,6 +64,19 @@ class Remote: if req.status_code == 200: return req.json() return {} - except Exception as exc: + except Exception as exc: # skipcq: PYL-W0703 logger.error("获取云端圣遗物评分规则失败: %s", exc_info=exc) return {} + + @staticmethod + async def get_damage_data() -> Dict[str, Any]: + """获取云端伤害计算规则""" + try: + async with AsyncClient() as client: + req = await client.get(Remote.DAMAGE) + if req.status_code == 200: + return req.json() + return {} + except Exception as exc: # skipcq: PYL-W0703 + logger.error("获取云端伤害计算规则失败: %s", exc_info=exc) + return {} diff --git a/plugins/genshin/player_cards.py b/plugins/genshin/player_cards.py index 009cbbc5..2e95540d 100644 --- a/plugins/genshin/player_cards.py +++ b/plugins/genshin/player_cards.py @@ -1,3 +1,4 @@ +import copy import math from typing import Any, List, Tuple, Union, Optional, TYPE_CHECKING, Dict @@ -20,17 +21,16 @@ from enkanetwork import ( from pydantic import BaseModel from telegram import InlineKeyboardButton, InlineKeyboardMarkup from telegram.constants import ChatAction -from telegram.ext import CommandHandler, MessageHandler, filters +from telegram.ext import filters from telegram.helpers import create_deep_linked_url from core.config import config from core.dependence.assets import DEFAULT_EnkaAssets, AssetsService from core.dependence.redisdb import RedisDB -from core.handler.callbackqueryhandler import CallbackQueryHandler from core.plugin import Plugin, handler from core.services.players import PlayersService from core.services.template.services import TemplateService -from metadata.shortname import roleToName +from metadata.shortname import roleToName, idToName from modules.apihelper.client.components.remote import Remote from modules.playercards.file import PlayerCardsFile from modules.playercards.helpers import ArtifactStatsTheory @@ -39,10 +39,28 @@ from utils.helpers import download_resource from utils.log import logger from utils.uid import mask_number +try: + from python_genshin_artifact.calculator import get_damage_analysis + from python_genshin_artifact.enka.characters import characters_map + from python_genshin_artifact.enka.enka_parser import enka_parser + from python_genshin_artifact.models.calculator import CalculatorConfig + from python_genshin_artifact.models.skill import SkillInfo + + GENSHIN_ARTIFACT_FUNCTION_AVAILABLE = True +except ImportError as exc: + get_damage_analysis = None + characters_map = {} + enka_parser = None + CalculatorConfig = None + SkillInfo = None + Assets = None + + GENSHIN_ARTIFACT_FUNCTION_AVAILABLE = False + if TYPE_CHECKING: from enkanetwork import CharacterInfo, EquipmentsStats from telegram.ext import ContextTypes - from telegram import Update + from telegram import Update, Message try: import ujson as jsonlib @@ -66,12 +84,14 @@ class PlayerCards(Plugin): self.template_service = template_service self.kitsune: Optional[str] = None self.fight_prop_rule: Dict[str, Dict[str, float]] = {} + self.damage_config: Dict = {} async def initialize(self): await self._refresh() async def _refresh(self): self.fight_prop_rule = await Remote.get_fight_prop_rule_data() + self.damage_config = await Remote.get_damage_data() async def _update_enka_data(self, uid) -> Union[EnkaNetworkResponse, str]: try: @@ -102,21 +122,51 @@ class PlayerCards(Plugin): error = "Enka.Network HTTP 服务请求错误,请稍后重试" return error - async def _load_history(self, uid) -> Optional[EnkaNetworkResponse]: + async def _load_data_as_enka_response(self, uid) -> Optional[EnkaNetworkResponse]: data = await self.player_cards_file.load_history_info(uid) if data is None: return None return EnkaNetworkResponse.parse_obj(data) - @handler(CommandHandler, command="player_card", block=False) - @handler(MessageHandler, filters=filters.Regex("^角色卡片查询(.*)"), block=False) + async def _load_history(self, uid) -> Optional[Dict]: + return await self.player_cards_file.load_history_info(uid) + + async def get_uid_and_ch( + self, user_id: int, args: List[str], reply: Optional["Message"] + ) -> Tuple[Optional[int], Optional[str]]: + """通过消息获取 uid,优先级:args > reply > self""" + uid, ch_name, user_id_ = None, None, user_id + if args: + for i in args: + if i is not None: + if i.isdigit() and len(i) == 9: + uid = int(i) + else: + ch_name = roleToName(i) + if reply: + try: + user_id_ = reply.from_user.id + except AttributeError: + pass + if not uid: + player_info = await self.player_service.get_player(user_id_) + if player_info is not None: + uid = player_info.player_id + if (not uid) and (user_id_ != user_id): + player_info = await self.player_service.get_player(user_id) + if player_info is not None: + uid = player_info.player_id + return uid, ch_name + + @handler.command(command="player_card", block=False) + @handler.message(filters=filters.Regex("^角色卡片查询(.*)"), block=False) async def player_cards(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None: user = update.effective_user message = update.effective_message args = self.get_args(context) await message.reply_chat_action(ChatAction.TYPING) - player_info = await self.player_service.get_player(user.id) - if player_info is None: + uid, character_name = await self.get_uid_and_ch(user.id, args, message.reply_to_message) + if uid is None: buttons = [ [ InlineKeyboardButton( @@ -136,8 +186,8 @@ class PlayerCards(Plugin): else: await message.reply_text("未查询到您所绑定的账号信息,请先绑定账号", reply_markup=InlineKeyboardMarkup(buttons)) return - data = await self._load_history(player_info.player_id) - if data is None: + original_data = await self._load_history(uid) + if original_data is None or len(original_data.get("avatarInfoList", [])) == 0: if isinstance(self.kitsune, str): photo = self.kitsune else: @@ -146,7 +196,7 @@ class PlayerCards(Plugin): [ InlineKeyboardButton( "更新面板", - callback_data=f"update_player_card|{user.id}|{player_info.player_id}", + callback_data=f"update_player_card|{user.id}|{uid}", ) ] ] @@ -158,29 +208,29 @@ class PlayerCards(Plugin): if reply_message.photo: self.kitsune = reply_message.photo[-1].file_id return - if len(args) == 1: - character_name = roleToName(args[0]) + enka_response = EnkaNetworkResponse.parse_obj(copy.deepcopy(original_data)) + if character_name is not None: logger.info( "用户 %s[%s] 角色卡片查询命令请求 || character_name[%s] uid[%s]", user.full_name, user.id, character_name, - player_info.player_id, + uid, ) else: logger.info("用户 %s[%s] 角色卡片查询命令请求", user.full_name, user.id) - ttl = await self.cache.ttl(player_info.player_id) - if data.characters is None or len(data.characters) == 0: + ttl = await self.cache.ttl(uid) + if enka_response.characters is None or len(enka_response.characters) == 0: buttons = [ [ InlineKeyboardButton( "更新面板", - callback_data=f"update_player_card|{user.id}|{player_info.player_id}", + callback_data=f"update_player_card|{user.id}|{uid}", ) ] ] else: - buttons = self.gen_button(data, user.id, player_info.player_id, update_button=ttl < 0) + buttons = self.gen_button(enka_response, user.id, uid, update_button=ttl < 0) if isinstance(self.kitsune, str): photo = self.kitsune else: @@ -193,22 +243,30 @@ class PlayerCards(Plugin): if reply_message.photo: self.kitsune = reply_message.photo[-1].file_id return - for characters in data.characters: + for characters in enka_response.characters: if characters.name == character_name: break else: await message.reply_text(f"角色展柜中未找到 {character_name} ,请检查角色是否存在于角色展柜中,或者等待角色数据更新后重试") return await message.reply_chat_action(ChatAction.UPLOAD_PHOTO) + original_data: Optional[Dict] = None + if GENSHIN_ARTIFACT_FUNCTION_AVAILABLE: + original_data = await self._load_history(uid) render_result = await RenderTemplate( - player_info.player_id, characters, self.fight_prop_rule, self.template_service + uid, + characters, + self.fight_prop_rule, + self.damage_config, + self.template_service, + original_data, ).render() # pylint: disable=W0631 await render_result.reply_photo( message, - filename=f"player_card_{player_info.player_id}_{character_name}.png", + filename=f"player_card_{uid}_{character_name}.png", ) - @handler(CallbackQueryHandler, pattern=r"^update_player_card\|", block=False) + @handler.callback_query(pattern=r"^update_player_card\|", block=False) async def update_player_card(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: user = update.effective_user message = update.effective_message @@ -253,7 +311,7 @@ class PlayerCards(Plugin): ) await holder.edit_media(message, reply_markup=InlineKeyboardMarkup(buttons)) - @handler(CallbackQueryHandler, pattern=r"^get_player_card\|", block=False) + @handler.callback_query(pattern=r"^get_player_card\|", block=False) async def get_player_cards(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None: callback_query = update.callback_query user = callback_query.from_user @@ -299,20 +357,18 @@ class PlayerCards(Plugin): result, uid, ) - data = await self._load_history(uid) - if isinstance(data, str): - await message.reply_text(data) - return - if data.characters is None or len(data.characters) == 0: + original_data = await self._load_history(uid) + enka_response = EnkaNetworkResponse.parse_obj(copy.deepcopy(original_data)) + if enka_response.characters is None or len(enka_response.characters) == 0: await callback_query.answer("请先将角色加入到角色展柜并允许查看角色详情后再使用此功能,如果已经添加了角色,请等待角色数据更新后重试", show_alert=True) await message.delete() return if page: - buttons = self.gen_button(data, user.id, uid, page, not await self.cache.ttl(uid) > 0) + buttons = self.gen_button(enka_response, user.id, uid, page, await self.cache.ttl(uid) <= 0) await message.edit_reply_markup(reply_markup=InlineKeyboardMarkup(buttons)) await callback_query.answer(f"已切换到第 {page} 页", show_alert=False) return - for characters in data.characters: + for characters in enka_response.characters: if characters.name == result: break else: @@ -322,7 +378,7 @@ class PlayerCards(Plugin): await callback_query.answer(text="正在渲染图片中 请稍等 请不要重复点击按钮", show_alert=False) await message.reply_chat_action(ChatAction.UPLOAD_PHOTO) render_result = await RenderTemplate( - uid, characters, self.fight_prop_rule, self.template_service + uid, characters, self.fight_prop_rule, self.damage_config, self.template_service, original_data ).render() # pylint: disable=W0631 render_result.filename = f"player_card_{uid}_{result}.png" await render_result.edit_media(message) @@ -465,13 +521,17 @@ class RenderTemplate: uid: Union[int, str], character: "CharacterInfo", fight_prop_rule: Dict[str, Dict[str, float]], - template_service: TemplateService = None, + damage_config: Dict, + template_service: TemplateService, + original_data: Optional[Dict] = None, ): self.uid = uid self.template_service = template_service # 因为需要替换线上 enka 图片地址为本地地址,先克隆数据,避免修改原数据 self.character = character.copy(deep=True) self.fight_prop_rule = fight_prop_rule + self.original_data = original_data + self.damage_config = damage_config async def render(self): # 缓存所有图片到本地 @@ -511,17 +571,66 @@ class RenderTemplate: "artifacts": artifacts, # 需要在模板中使用的 enum 类型 "DigitType": DigitType, + "damage_function_available": False, + "damage_info": [], } + if GENSHIN_ARTIFACT_FUNCTION_AVAILABLE: + character_cn_name = idToName(self.character.id) + damage_config = self.damage_config.get(character_cn_name) + if damage_config is not None: + data["damage_function_available"] = True + data["damage_info"] = self.render_damage(damage_config) + return await self.template_service.render( "genshin/player_card/player_card.jinja2", data, - {"width": 950, "height": 1080}, full_page=True, query_selector=".text-neutral-200", ttl=7 * 24 * 60 * 60, ) + def render_damage(self, damage_config: Optional[Dict]) -> List: + character, weapon, artifacts = enka_parser(self.original_data, self.character.id) + character_name = character.name + character_cn_name = idToName(self.character.id) + if damage_config is None: + damage_config = self.damage_config.get(character_cn_name) + skills = damage_config.get("skills") + config_skill = damage_config.get("config_skill") + if config_skill is not None: + config_skill = {character_name: config_skill} + else: + config_skill = "NoConfig" + character_config = damage_config.get("config") + artifact_config = damage_config.get("artifact_config") + if character_config is not None: + character.params = {character_name: character_config} + config_weapon = damage_config.get("config_weapon") + if config_weapon is not None: + _weapon_config = config_weapon.get(weapon.name) + if _weapon_config is not None: + weapon.params = {weapon.name: _weapon_config} + damage = [] + for skill in skills: + index = skill.get("index") + skill_info = SkillInfo(index=index, config=config_skill) + calculator_config = CalculatorConfig( + character=character, + weapon=weapon, + artifacts=artifacts, + skill=skill_info, + artifact_config=artifact_config, + ) + damage_analysis = get_damage_analysis(calculator_config) + damage_key = skill.get("damage_key") + damage_value = getattr(damage_analysis, damage_key) + if damage_value is not None: + damage_info = {"damage": damage_value, "skill_info": skill} + damage.append(damage_info) + + return damage + async def de_stats(self) -> List[Tuple[str, Any]]: stats = self.character.stats items: List[Tuple[str, Any]] = [] diff --git a/resources/genshin/player_card/damage.jinja2 b/resources/genshin/player_card/damage.jinja2 new file mode 100644 index 00000000..f9920767 --- /dev/null +++ b/resources/genshin/player_card/damage.jinja2 @@ -0,0 +1,18 @@ +
技能 | +暴击伤害 | +期望伤害 | +
---|---|---|
{{ item.skill_info.name }} | +{{ item.damage.critical|round|int }} | +{{ item.damage.expectation|round|int }} | +