diff --git a/modules/playercards/file.py b/modules/playercards/file.py index dcaf065e..29cc1f3a 100644 --- a/modules/playercards/file.py +++ b/modules/playercards/file.py @@ -62,10 +62,13 @@ class PlayerCardsFile: self, uid: Union[str, int], data: Dict, + use_old: bool = False, ) -> Dict: async with self._lock: old_data = await self.load_history_info(uid) if old_data is None: + if use_old: + raise FileNotFoundError await self.save_json(self.get_file_path(uid), data) return data data["avatarInfoList"] = data.get("avatarInfoList") or [] @@ -73,5 +76,8 @@ class PlayerCardsFile: for i in old_data["avatarInfoList"]: if i.get("avatarId", 0) not in characters: data["avatarInfoList"].append(i) + if use_old: + old_data["avatarInfoList"] = data["avatarInfoList"] + data = old_data await self.save_json(self.get_file_path(uid), data) return data diff --git a/modules/playercards/to_enka.py b/modules/playercards/to_enka.py new file mode 100644 index 00000000..b7b98477 --- /dev/null +++ b/modules/playercards/to_enka.py @@ -0,0 +1,227 @@ +from typing import TYPE_CHECKING, Dict, List, Any, Optional + +from enkanetwork import Assets, CharacterStats + +from utils.log import logger + +if TYPE_CHECKING: + from simnet.models.genshin.chronicle.character_detail import ( + GenshinDetailCharacters, + GenshinDetailCharacter, + PropertyValue, + DetailArtifact, + ArtifactProperty, + DetailCharacterWeapon, + ) + + +class HashMapRev: + HASH_MAP_REV: Dict[str, Dict[str, str]] = {} + + @classmethod + def get_hash_map(cls, name: str) -> Optional[str]: + cls.reload_assets() + for key in cls.HASH_MAP_REV: + if name in cls.HASH_MAP_REV[key]: + return cls.HASH_MAP_REV[key][name] + return "" + + @classmethod + def get_artifacts_data(cls, artifact_id: int) -> Dict: + cls.reload_assets() + return Assets.DATA["artifacts"][str(artifact_id)] + + @classmethod + def get_weapon_data(cls, weapon_id: int) -> Dict: + cls.reload_assets() + return Assets.DATA["weapons"][str(weapon_id)] + + @classmethod + def reload_assets(cls) -> None: + # Load assets + if not Assets.HASH_MAP: + Assets.reload_assets() + if not cls.HASH_MAP_REV: + cls.HASH_MAP_REV = {key: {v["CHS"]: k for k, v in value.items()} for key, value in Assets.HASH_MAP.items()} + + +def get_prop_name_from_id(prop_id: int) -> str: + for k, v in CharacterStats.__fields__.items(): + if v.default.id == prop_id: + return k + return "" + + +def get_equip_list_single_artifact_stats(data: "DetailArtifact") -> Dict: + main_stat = data.main_property + sub_stats = data.sub_property_list + + def _get_stat(v: "ArtifactProperty", key: str) -> Dict: + return { + key: get_prop_name_from_id(v.property_type), + "statValue": float(v.value.replace("%", "")), + } + + return { + "reliquaryMainstat": _get_stat(main_stat, "mainPropId"), + "reliquarySubstats": [_get_stat(v, "appendPropId") for v in sub_stats], + } + + +def get_equip_list_single_weapon_stats(data: "DetailCharacterWeapon") -> List[Dict]: + stats = [data.main_property] + if data.sub_property: + stats.append(data.sub_property) + + def _get_stat(v: "PropertyValue") -> Dict: + return { + "appendPropId": get_prop_name_from_id(v.property_type), + "statValue": float(v.final.replace("%", "")), + } + + return [_get_stat(v) for v in stats] + + +def get_equip_list_single_artifact(data: "DetailArtifact") -> Dict: + item_id = data.id + item_data = HashMapRev.get_artifacts_data(item_id) + reliquary = { + "appendPropIdList": [], + "level": data.level + 1, + } + flat = { + "equipType": item_data["equipType"], + "icon": item_data["icon"], + "itemType": item_data["itemType"], + "nameTextMapHash": str(item_data["nameTextMapHash"]), + "rankLevel": item_data["rankLevel"], + "setNameTextMapHash": HashMapRev.get_hash_map(data.set.name), + **get_equip_list_single_artifact_stats(data), + } + return { + "flat": flat, + "itemId": item_id, + "reliquary": reliquary, + } + + +def get_equip_list_single_weapon(data: "DetailCharacterWeapon") -> Dict: + item_id = data.id + item_data = HashMapRev.get_weapon_data(item_id) + weapon = { + "affixMap": {"0": data.refinement - 1}, + "level": data.level, + "promoteLevel": data.ascension, + } + flat = { + "icon": item_data["icon"], + "itemType": "ITEM_WEAPON", + "nameTextMapHash": str(item_data["nameTextMapHash"]), + "rankLevel": item_data["rankLevel"], + "weaponStats": get_equip_list_single_weapon_stats(data), + } + return { + "flat": flat, + "itemId": item_id, + "weapon": weapon, + } + + +def get_equip_list_single(index: int, data: "GenshinDetailCharacter") -> Dict: + if index >= len(data.artifacts): + return get_equip_list_single_weapon(data.weapon) + return get_equip_list_single_artifact(data.artifacts[index]) + + +def get_equip_list_loop(data: "GenshinDetailCharacter") -> List[Dict]: + return [get_equip_list_single(index, data) for index in range(len(data.artifacts) + 1)] + + +def get_fetter_info(data: "GenshinDetailCharacter") -> Dict[str, int]: + return {"expLevel": data.base.friendship} + + +def get_fight_prop_map(data: "GenshinDetailCharacter") -> Dict[str, float]: + f = [] + f.extend(data.base_properties) + f.extend(data.extra_properties) + f.extend(data.element_properties) + f.sort(key=lambda k: k.property_type) + + def _prop_to_value(v: "PropertyValue") -> float: + if "%" not in v.final: + return float(v.final) + return float(v.final.replace("%", "")) / 100 + + return {str(prop.property_type): _prop_to_value(prop) for prop in f} + + +def get_inherent_proud_skill_list(data: "GenshinDetailCharacter") -> List[int]: + return [(skill.id * 100 + 1) for skill in data.skills if skill.skill_type == 2] + + +def get_prop_map(data: "GenshinDetailCharacter") -> Dict[str, Dict[str, Any]]: + level = str(data.base.level) + return { + "1001": {"ival": "0", "type": 1001}, # XP + "1002": {"ival": "6", "type": 1002, "val": "6"}, # Ascension + "4001": {"ival": level, "type": 4001, "val": level}, # Level + } + + +def get_skill_depot_id(data: "GenshinDetailCharacter") -> int: + skill = data.skills[0] + skill_id = skill.id % 10 + if data.base.id in [10000005, 10000007]: + skill_id += 1 + skill_id_pre = (data.base.id - 10000000) * 100 + return skill_id_pre + skill_id + + +def get_skill_level_map(data: "GenshinDetailCharacter") -> Dict: + return {str(skill.id): skill.level for skill in data.skills if skill.skill_type == 1} + + +def get_talent_id_list(data: "GenshinDetailCharacter") -> List[int]: + return [constellation.id for constellation in data.constellations if constellation.activated] + + +def from_simnet_to_enka_single(index: int, data: "GenshinDetailCharacters") -> Dict: + character = data.characters[index] + avatar_id = character.base.id + equip_list = get_equip_list_loop(character) + fetter_info = get_fetter_info(character) + fight_prop_map = get_fight_prop_map(character) + inherent_proud_skill_list = get_inherent_proud_skill_list(character) + prop_map = get_prop_map(character) + skill_depot_id = get_skill_depot_id(character) + skill_level_map = get_skill_level_map(character) + talent_id_list = get_talent_id_list(character) + return { + "avatarId": avatar_id, + "equipList": equip_list, + "fetterInfo": fetter_info, + "propMap": prop_map, + "talentIdList": talent_id_list, + "skillDepotId": skill_depot_id, + "inherentProudSkillList": inherent_proud_skill_list, + "fightPropMap": fight_prop_map, + "skillLevelMap": skill_level_map, + } + + +def from_simnet_to_enka_loop(data: "GenshinDetailCharacters") -> List[Dict]: + d = [] + for index, ch in enumerate(data.characters): + try: + d.append(from_simnet_to_enka_single(index, data)) + except Exception as e: + cid = ch.base.id + logger.error("从 simnet 模型转换为 enka 模型时出现错误 cid[%s]", cid, exc_info=e) + return d + + +def from_simnet_to_enka(data: "GenshinDetailCharacters") -> Dict: + return { + "avatarInfoList": from_simnet_to_enka_loop(data), + } diff --git a/plugins/genshin/player_cards.py b/plugins/genshin/player_cards.py index dae2cd53..8b5eab1d 100644 --- a/plugins/genshin/player_cards.py +++ b/plugins/genshin/player_cards.py @@ -34,7 +34,8 @@ from modules.apihelper.client.components.remote import Remote from modules.gcsim.file import PlayerGCSimScripts from modules.playercards.file import PlayerCardsFile from modules.playercards.helpers import ArtifactStatsTheory -from plugins.tools.genshin import PlayerNotFoundError +from modules.playercards.to_enka import from_simnet_to_enka +from plugins.tools.genshin import PlayerNotFoundError, GenshinHelper, CookiesNotFoundError from utils.enkanetwork import RedisCache, EnkaNetworkAPI from utils.helpers import download_resource from utils.log import logger @@ -60,6 +61,7 @@ if TYPE_CHECKING: from enkanetwork import CharacterInfo, EquipmentsStats from telegram.ext import ContextTypes from telegram import Update, Message + from simnet import GenshinClient try: import ujson as jsonlib @@ -74,6 +76,7 @@ class PlayerCards(Plugin): template_service: TemplateService, assets_service: AssetsService, redis: RedisDB, + helper: GenshinHelper, ): self.player_service = player_service self.client = EnkaNetworkAPI(lang="chs", user_agent=config.enka_network_api_agent, cache=False) @@ -85,6 +88,7 @@ class PlayerCards(Plugin): self.kitsune: Optional[str] = None self.fight_prop_rule: Dict[str, Dict[str, float]] = {} self.damage_config: Dict = {} + self.helper = helper async def initialize(self): await self._refresh() @@ -122,6 +126,26 @@ class PlayerCards(Plugin): error = "Enka.Network HTTP 服务请求错误,请稍后重试" return error + async def _update_mihoyo_data(self, user_id: int, uid: int) -> Union[EnkaNetworkResponse, str]: + error = "发生未知错误" + try: + data = await self.cache.get(str(uid)) + if data is not None: + return EnkaNetworkResponse.parse_obj(data) + async with self.helper.genshin(user_id=user_id, player_id=uid) as client: + client: "GenshinClient" + ids = await client.get_genshin_character_list() + raw_details = await client.get_genshin_character_detail([c.id for c in ids]) + data = from_simnet_to_enka(raw_details) + data = await self.player_cards_file.merge_info(uid, data, use_old=True) + await self.cache.set(str(uid), data) + return EnkaNetworkResponse.parse_obj(data) + except FileNotFoundError: + error = "请先通过 Enka.Network 更新一次角色列表" + except (PlayerNotFoundError, CookiesNotFoundError): + error = "请先通过 cookie 绑定账号" + return error + async def _load_data_as_enka_response(self, uid) -> Optional[EnkaNetworkResponse]: data = await self.player_cards_file.load_history_info(uid) if data is None: @@ -192,8 +216,8 @@ class PlayerCards(Plugin): buttons = [ [ InlineKeyboardButton( - "更新面板", - callback_data=f"update_player_card|{user_id}|{uid}", + "更新", + callback_data=f"update_player_card|{user_id}|{uid}|enka", ) ] ] @@ -213,8 +237,8 @@ class PlayerCards(Plugin): buttons = [ [ InlineKeyboardButton( - "更新面板", - callback_data=f"update_player_card|{user_id}|{uid}", + "更新", + callback_data=f"update_player_card|{user_id}|{uid}|enka", ) ] ] @@ -272,14 +296,15 @@ class PlayerCards(Plugin): message = update.effective_message callback_query = update.callback_query - async def get_player_card_callback(callback_query_data: str) -> Tuple[int, int]: + async def get_player_card_callback(callback_query_data: str) -> Tuple[int, int, str]: _data = callback_query_data.split("|") _user_id = int(_data[1]) _uid = int(_data[2]) - logger.debug("callback_query_data函数返回 user_id[%s] uid[%s]", _user_id, _uid) - return _user_id, _uid + _type = _data[3] if len(_data) > 3 else "enka" + logger.debug("callback_query_data函数返回 user_id[%s] uid[%s] type[%s]", _user_id, _uid, _type) + return _user_id, _uid, _type - user_id, uid = await get_player_card_callback(callback_query.data) + user_id, uid, update_type = await get_player_card_callback(callback_query.data) if user.id != user_id: await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True) return @@ -291,7 +316,12 @@ class PlayerCards(Plugin): return await message.reply_chat_action(ChatAction.TYPING) - data = await self._update_enka_data(uid) + if update_type == "enka": + data = await self._update_enka_data(uid) + text = "正在从 Enka.Network 获取角色列表 请不要重复点击按钮" + else: + data = await self._update_mihoyo_data(user_id, uid) + text = "正在从米忽悠获取角色列表 请不要重复点击按钮" if isinstance(data, str): await callback_query.answer(text=data, show_alert=True) return @@ -303,7 +333,7 @@ class PlayerCards(Plugin): await message.delete() return self.player_gcsim_scripts.remove_fits(uid) - await callback_query.answer(text="正在从 Enka.Network 获取角色列表 请不要重复点击按钮") + await callback_query.answer(text=text) buttons = self.gen_button(data, user.id, uid, update_button=False) render_data = await self.parse_holder_data(data) holder = await self.template_service.render( @@ -436,8 +466,14 @@ class PlayerCards(Plugin): if update_button: last_button.append( InlineKeyboardButton( - "更新面板", - callback_data=f"update_player_card|{user_id}|{uid}", + "更新", + callback_data=f"update_player_card|{user_id}|{uid}|enka", + ) + ) + last_button.append( + InlineKeyboardButton( + "更新全部", + callback_data=f"update_player_card|{user_id}|{uid}|mihoyo", ) ) if next_page: @@ -526,6 +562,25 @@ class Artifact(BaseModel): return mapping.get(label, "text-neutral-400") +class DamageResultNew(BaseModel): + + critical: float + non_critical: float + expectation: float + is_heal: bool + is_shield: bool + + @classmethod + def parse_from(cls, value) -> "DamageResultNew": + return cls( + critical=value.critical, + non_critical=value.non_critical, + expectation=value.expectation, + is_heal=value.is_heal, + is_shield=value.is_shield, + ) + + class RenderTemplate: def __init__( self, @@ -592,6 +647,9 @@ class RenderTemplate: if damage_config is not None: try: data["damage_info"] = self.render_damage(damage_config) + for damage in data["damage_info"]: + if damage["damage"] is not None: + damage["damage"] = DamageResultNew.parse_from(damage["damage"]) except JsonParseException as _exc: logger.error(str(_exc)) except EnkaParseException as _exc: @@ -689,7 +747,7 @@ class RenderTemplate: elif stat[1].id != 26: # 治疗加成 continue value = stat[1].to_rounded() if isinstance(stat[1], Stats) else stat[1].to_percentage_symbol() - if value in ("0%", 0): + if value in ("0%", "0.0%", 0): continue name = DEFAULT_EnkaAssets.get_hash_map(stat[0]) if name is None: