Support detailed Genshin character endpoint when refresh player_card

This commit is contained in:
omg-xtao 2024-08-22 23:00:28 +08:00 committed by GitHub
parent 3e5b3e47ee
commit c41d9170b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 305 additions and 14 deletions

View File

@ -62,10 +62,13 @@ class PlayerCardsFile:
self, self,
uid: Union[str, int], uid: Union[str, int],
data: Dict, data: Dict,
use_old: bool = False,
) -> Dict: ) -> Dict:
async with self._lock: async with self._lock:
old_data = await self.load_history_info(uid) old_data = await self.load_history_info(uid)
if old_data is None: if old_data is None:
if use_old:
raise FileNotFoundError
await self.save_json(self.get_file_path(uid), data) await self.save_json(self.get_file_path(uid), data)
return data return data
data["avatarInfoList"] = data.get("avatarInfoList") or [] data["avatarInfoList"] = data.get("avatarInfoList") or []
@ -73,5 +76,8 @@ class PlayerCardsFile:
for i in old_data["avatarInfoList"]: for i in old_data["avatarInfoList"]:
if i.get("avatarId", 0) not in characters: if i.get("avatarId", 0) not in characters:
data["avatarInfoList"].append(i) data["avatarInfoList"].append(i)
if use_old:
old_data["avatarInfoList"] = data["avatarInfoList"]
data = old_data
await self.save_json(self.get_file_path(uid), data) await self.save_json(self.get_file_path(uid), data)
return data return data

View File

@ -0,0 +1,227 @@
from typing import TYPE_CHECKING, Dict, List, Any, Optional
from enkanetwork import Assets, CharacterStats
from utils.log import logger
if TYPE_CHECKING:
from simnet.models.genshin.chronicle.character_detail import (
GenshinDetailCharacters,
GenshinDetailCharacter,
PropertyValue,
DetailArtifact,
ArtifactProperty,
DetailCharacterWeapon,
)
class HashMapRev:
HASH_MAP_REV: Dict[str, Dict[str, str]] = {}
@classmethod
def get_hash_map(cls, name: str) -> Optional[str]:
cls.reload_assets()
for key in cls.HASH_MAP_REV:
if name in cls.HASH_MAP_REV[key]:
return cls.HASH_MAP_REV[key][name]
return ""
@classmethod
def get_artifacts_data(cls, artifact_id: int) -> Dict:
cls.reload_assets()
return Assets.DATA["artifacts"][str(artifact_id)]
@classmethod
def get_weapon_data(cls, weapon_id: int) -> Dict:
cls.reload_assets()
return Assets.DATA["weapons"][str(weapon_id)]
@classmethod
def reload_assets(cls) -> None:
# Load assets
if not Assets.HASH_MAP:
Assets.reload_assets()
if not cls.HASH_MAP_REV:
cls.HASH_MAP_REV = {key: {v["CHS"]: k for k, v in value.items()} for key, value in Assets.HASH_MAP.items()}
def get_prop_name_from_id(prop_id: int) -> str:
for k, v in CharacterStats.__fields__.items():
if v.default.id == prop_id:
return k
return ""
def get_equip_list_single_artifact_stats(data: "DetailArtifact") -> Dict:
main_stat = data.main_property
sub_stats = data.sub_property_list
def _get_stat(v: "ArtifactProperty", key: str) -> Dict:
return {
key: get_prop_name_from_id(v.property_type),
"statValue": float(v.value.replace("%", "")),
}
return {
"reliquaryMainstat": _get_stat(main_stat, "mainPropId"),
"reliquarySubstats": [_get_stat(v, "appendPropId") for v in sub_stats],
}
def get_equip_list_single_weapon_stats(data: "DetailCharacterWeapon") -> List[Dict]:
stats = [data.main_property]
if data.sub_property:
stats.append(data.sub_property)
def _get_stat(v: "PropertyValue") -> Dict:
return {
"appendPropId": get_prop_name_from_id(v.property_type),
"statValue": float(v.final.replace("%", "")),
}
return [_get_stat(v) for v in stats]
def get_equip_list_single_artifact(data: "DetailArtifact") -> Dict:
item_id = data.id
item_data = HashMapRev.get_artifacts_data(item_id)
reliquary = {
"appendPropIdList": [],
"level": data.level + 1,
}
flat = {
"equipType": item_data["equipType"],
"icon": item_data["icon"],
"itemType": item_data["itemType"],
"nameTextMapHash": str(item_data["nameTextMapHash"]),
"rankLevel": item_data["rankLevel"],
"setNameTextMapHash": HashMapRev.get_hash_map(data.set.name),
**get_equip_list_single_artifact_stats(data),
}
return {
"flat": flat,
"itemId": item_id,
"reliquary": reliquary,
}
def get_equip_list_single_weapon(data: "DetailCharacterWeapon") -> Dict:
item_id = data.id
item_data = HashMapRev.get_weapon_data(item_id)
weapon = {
"affixMap": {"0": data.refinement - 1},
"level": data.level,
"promoteLevel": data.ascension,
}
flat = {
"icon": item_data["icon"],
"itemType": "ITEM_WEAPON",
"nameTextMapHash": str(item_data["nameTextMapHash"]),
"rankLevel": item_data["rankLevel"],
"weaponStats": get_equip_list_single_weapon_stats(data),
}
return {
"flat": flat,
"itemId": item_id,
"weapon": weapon,
}
def get_equip_list_single(index: int, data: "GenshinDetailCharacter") -> Dict:
if index >= len(data.artifacts):
return get_equip_list_single_weapon(data.weapon)
return get_equip_list_single_artifact(data.artifacts[index])
def get_equip_list_loop(data: "GenshinDetailCharacter") -> List[Dict]:
return [get_equip_list_single(index, data) for index in range(len(data.artifacts) + 1)]
def get_fetter_info(data: "GenshinDetailCharacter") -> Dict[str, int]:
return {"expLevel": data.base.friendship}
def get_fight_prop_map(data: "GenshinDetailCharacter") -> Dict[str, float]:
f = []
f.extend(data.base_properties)
f.extend(data.extra_properties)
f.extend(data.element_properties)
f.sort(key=lambda k: k.property_type)
def _prop_to_value(v: "PropertyValue") -> float:
if "%" not in v.final:
return float(v.final)
return float(v.final.replace("%", "")) / 100
return {str(prop.property_type): _prop_to_value(prop) for prop in f}
def get_inherent_proud_skill_list(data: "GenshinDetailCharacter") -> List[int]:
return [(skill.id * 100 + 1) for skill in data.skills if skill.skill_type == 2]
def get_prop_map(data: "GenshinDetailCharacter") -> Dict[str, Dict[str, Any]]:
level = str(data.base.level)
return {
"1001": {"ival": "0", "type": 1001}, # XP
"1002": {"ival": "6", "type": 1002, "val": "6"}, # Ascension
"4001": {"ival": level, "type": 4001, "val": level}, # Level
}
def get_skill_depot_id(data: "GenshinDetailCharacter") -> int:
skill = data.skills[0]
skill_id = skill.id % 10
if data.base.id in [10000005, 10000007]:
skill_id += 1
skill_id_pre = (data.base.id - 10000000) * 100
return skill_id_pre + skill_id
def get_skill_level_map(data: "GenshinDetailCharacter") -> Dict:
return {str(skill.id): skill.level for skill in data.skills if skill.skill_type == 1}
def get_talent_id_list(data: "GenshinDetailCharacter") -> List[int]:
return [constellation.id for constellation in data.constellations if constellation.activated]
def from_simnet_to_enka_single(index: int, data: "GenshinDetailCharacters") -> Dict:
character = data.characters[index]
avatar_id = character.base.id
equip_list = get_equip_list_loop(character)
fetter_info = get_fetter_info(character)
fight_prop_map = get_fight_prop_map(character)
inherent_proud_skill_list = get_inherent_proud_skill_list(character)
prop_map = get_prop_map(character)
skill_depot_id = get_skill_depot_id(character)
skill_level_map = get_skill_level_map(character)
talent_id_list = get_talent_id_list(character)
return {
"avatarId": avatar_id,
"equipList": equip_list,
"fetterInfo": fetter_info,
"propMap": prop_map,
"talentIdList": talent_id_list,
"skillDepotId": skill_depot_id,
"inherentProudSkillList": inherent_proud_skill_list,
"fightPropMap": fight_prop_map,
"skillLevelMap": skill_level_map,
}
def from_simnet_to_enka_loop(data: "GenshinDetailCharacters") -> List[Dict]:
d = []
for index, ch in enumerate(data.characters):
try:
d.append(from_simnet_to_enka_single(index, data))
except Exception as e:
cid = ch.base.id
logger.error("从 simnet 模型转换为 enka 模型时出现错误 cid[%s]", cid, exc_info=e)
return d
def from_simnet_to_enka(data: "GenshinDetailCharacters") -> Dict:
return {
"avatarInfoList": from_simnet_to_enka_loop(data),
}

View File

@ -34,7 +34,8 @@ from modules.apihelper.client.components.remote import Remote
from modules.gcsim.file import PlayerGCSimScripts from modules.gcsim.file import PlayerGCSimScripts
from modules.playercards.file import PlayerCardsFile from modules.playercards.file import PlayerCardsFile
from modules.playercards.helpers import ArtifactStatsTheory from modules.playercards.helpers import ArtifactStatsTheory
from plugins.tools.genshin import PlayerNotFoundError from modules.playercards.to_enka import from_simnet_to_enka
from plugins.tools.genshin import PlayerNotFoundError, GenshinHelper, CookiesNotFoundError
from utils.enkanetwork import RedisCache, EnkaNetworkAPI from utils.enkanetwork import RedisCache, EnkaNetworkAPI
from utils.helpers import download_resource from utils.helpers import download_resource
from utils.log import logger from utils.log import logger
@ -60,6 +61,7 @@ if TYPE_CHECKING:
from enkanetwork import CharacterInfo, EquipmentsStats from enkanetwork import CharacterInfo, EquipmentsStats
from telegram.ext import ContextTypes from telegram.ext import ContextTypes
from telegram import Update, Message from telegram import Update, Message
from simnet import GenshinClient
try: try:
import ujson as jsonlib import ujson as jsonlib
@ -74,6 +76,7 @@ class PlayerCards(Plugin):
template_service: TemplateService, template_service: TemplateService,
assets_service: AssetsService, assets_service: AssetsService,
redis: RedisDB, redis: RedisDB,
helper: GenshinHelper,
): ):
self.player_service = player_service self.player_service = player_service
self.client = EnkaNetworkAPI(lang="chs", user_agent=config.enka_network_api_agent, cache=False) self.client = EnkaNetworkAPI(lang="chs", user_agent=config.enka_network_api_agent, cache=False)
@ -85,6 +88,7 @@ class PlayerCards(Plugin):
self.kitsune: Optional[str] = None self.kitsune: Optional[str] = None
self.fight_prop_rule: Dict[str, Dict[str, float]] = {} self.fight_prop_rule: Dict[str, Dict[str, float]] = {}
self.damage_config: Dict = {} self.damage_config: Dict = {}
self.helper = helper
async def initialize(self): async def initialize(self):
await self._refresh() await self._refresh()
@ -122,6 +126,26 @@ class PlayerCards(Plugin):
error = "Enka.Network HTTP 服务请求错误,请稍后重试" error = "Enka.Network HTTP 服务请求错误,请稍后重试"
return error return error
async def _update_mihoyo_data(self, user_id: int, uid: int) -> Union[EnkaNetworkResponse, str]:
error = "发生未知错误"
try:
data = await self.cache.get(str(uid))
if data is not None:
return EnkaNetworkResponse.parse_obj(data)
async with self.helper.genshin(user_id=user_id, player_id=uid) as client:
client: "GenshinClient"
ids = await client.get_genshin_character_list()
raw_details = await client.get_genshin_character_detail([c.id for c in ids])
data = from_simnet_to_enka(raw_details)
data = await self.player_cards_file.merge_info(uid, data, use_old=True)
await self.cache.set(str(uid), data)
return EnkaNetworkResponse.parse_obj(data)
except FileNotFoundError:
error = "请先通过 Enka.Network 更新一次角色列表"
except (PlayerNotFoundError, CookiesNotFoundError):
error = "请先通过 cookie 绑定账号"
return error
async def _load_data_as_enka_response(self, uid) -> Optional[EnkaNetworkResponse]: async def _load_data_as_enka_response(self, uid) -> Optional[EnkaNetworkResponse]:
data = await self.player_cards_file.load_history_info(uid) data = await self.player_cards_file.load_history_info(uid)
if data is None: if data is None:
@ -192,8 +216,8 @@ class PlayerCards(Plugin):
buttons = [ buttons = [
[ [
InlineKeyboardButton( InlineKeyboardButton(
"更新面板", "更新",
callback_data=f"update_player_card|{user_id}|{uid}", callback_data=f"update_player_card|{user_id}|{uid}|enka",
) )
] ]
] ]
@ -213,8 +237,8 @@ class PlayerCards(Plugin):
buttons = [ buttons = [
[ [
InlineKeyboardButton( InlineKeyboardButton(
"更新面板", "更新",
callback_data=f"update_player_card|{user_id}|{uid}", callback_data=f"update_player_card|{user_id}|{uid}|enka",
) )
] ]
] ]
@ -272,14 +296,15 @@ class PlayerCards(Plugin):
message = update.effective_message message = update.effective_message
callback_query = update.callback_query callback_query = update.callback_query
async def get_player_card_callback(callback_query_data: str) -> Tuple[int, int]: async def get_player_card_callback(callback_query_data: str) -> Tuple[int, int, str]:
_data = callback_query_data.split("|") _data = callback_query_data.split("|")
_user_id = int(_data[1]) _user_id = int(_data[1])
_uid = int(_data[2]) _uid = int(_data[2])
logger.debug("callback_query_data函数返回 user_id[%s] uid[%s]", _user_id, _uid) _type = _data[3] if len(_data) > 3 else "enka"
return _user_id, _uid logger.debug("callback_query_data函数返回 user_id[%s] uid[%s] type[%s]", _user_id, _uid, _type)
return _user_id, _uid, _type
user_id, uid = await get_player_card_callback(callback_query.data) user_id, uid, update_type = await get_player_card_callback(callback_query.data)
if user.id != user_id: if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True) await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return return
@ -291,7 +316,12 @@ class PlayerCards(Plugin):
return return
await message.reply_chat_action(ChatAction.TYPING) await message.reply_chat_action(ChatAction.TYPING)
data = await self._update_enka_data(uid) if update_type == "enka":
data = await self._update_enka_data(uid)
text = "正在从 Enka.Network 获取角色列表 请不要重复点击按钮"
else:
data = await self._update_mihoyo_data(user_id, uid)
text = "正在从米忽悠获取角色列表 请不要重复点击按钮"
if isinstance(data, str): if isinstance(data, str):
await callback_query.answer(text=data, show_alert=True) await callback_query.answer(text=data, show_alert=True)
return return
@ -303,7 +333,7 @@ class PlayerCards(Plugin):
await message.delete() await message.delete()
return return
self.player_gcsim_scripts.remove_fits(uid) self.player_gcsim_scripts.remove_fits(uid)
await callback_query.answer(text="正在从 Enka.Network 获取角色列表 请不要重复点击按钮") await callback_query.answer(text=text)
buttons = self.gen_button(data, user.id, uid, update_button=False) buttons = self.gen_button(data, user.id, uid, update_button=False)
render_data = await self.parse_holder_data(data) render_data = await self.parse_holder_data(data)
holder = await self.template_service.render( holder = await self.template_service.render(
@ -436,8 +466,14 @@ class PlayerCards(Plugin):
if update_button: if update_button:
last_button.append( last_button.append(
InlineKeyboardButton( InlineKeyboardButton(
"更新面板", "更新",
callback_data=f"update_player_card|{user_id}|{uid}", callback_data=f"update_player_card|{user_id}|{uid}|enka",
)
)
last_button.append(
InlineKeyboardButton(
"更新全部",
callback_data=f"update_player_card|{user_id}|{uid}|mihoyo",
) )
) )
if next_page: if next_page:
@ -526,6 +562,25 @@ class Artifact(BaseModel):
return mapping.get(label, "text-neutral-400") return mapping.get(label, "text-neutral-400")
class DamageResultNew(BaseModel):
critical: float
non_critical: float
expectation: float
is_heal: bool
is_shield: bool
@classmethod
def parse_from(cls, value) -> "DamageResultNew":
return cls(
critical=value.critical,
non_critical=value.non_critical,
expectation=value.expectation,
is_heal=value.is_heal,
is_shield=value.is_shield,
)
class RenderTemplate: class RenderTemplate:
def __init__( def __init__(
self, self,
@ -592,6 +647,9 @@ class RenderTemplate:
if damage_config is not None: if damage_config is not None:
try: try:
data["damage_info"] = self.render_damage(damage_config) data["damage_info"] = self.render_damage(damage_config)
for damage in data["damage_info"]:
if damage["damage"] is not None:
damage["damage"] = DamageResultNew.parse_from(damage["damage"])
except JsonParseException as _exc: except JsonParseException as _exc:
logger.error(str(_exc)) logger.error(str(_exc))
except EnkaParseException as _exc: except EnkaParseException as _exc:
@ -689,7 +747,7 @@ class RenderTemplate:
elif stat[1].id != 26: # 治疗加成 elif stat[1].id != 26: # 治疗加成
continue continue
value = stat[1].to_rounded() if isinstance(stat[1], Stats) else stat[1].to_percentage_symbol() value = stat[1].to_rounded() if isinstance(stat[1], Stats) else stat[1].to_percentage_symbol()
if value in ("0%", 0): if value in ("0%", "0.0%", 0):
continue continue
name = DEFAULT_EnkaAssets.get_hash_map(stat[0]) name = DEFAULT_EnkaAssets.get_hash_map(stat[0])
if name is None: if name is None: