PamGram/plugins/starrail/player_cards.py

600 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import math
from typing import List, Tuple, Union, Optional, TYPE_CHECKING, Dict
from pydantic import BaseModel
from starrailrelicscore.client.character import Character as CharacterClient
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Message
from telegram.constants import ChatAction
from telegram.ext import filters
from core.config import config
from core.dependence.assets import AssetsService, AssetsCouldNotFound
from core.dependence.redisdb import RedisDB
from core.plugin import Plugin, handler
from core.services.players import PlayersService
from core.services.template.services import TemplateService
from core.services.wiki.services import WikiService
from metadata.shortname import roleToName, idToRole
from modules.apihelper.client.components.player_cards import PlayerCards as PlayerCardsClient, PlayerInfo, Avatar, Relic
from modules.apihelper.client.components.remote import Remote
from plugins.tools.genshin import PlayerNotFoundError
from utils.log import logger
from utils.uid import mask_number
try:
from starrail_damage_cal.mihomo.models import Avatar as DamageAvatar
from starrail_damage_cal.to_data import get_data as get_damage_data
from starrail_damage_cal.cal_damage import cal_info as cal_damage_info
from msgspec import convert as msgspec_convert
STARRAIL_ARTIFACT_FUNCTION_AVAILABLE = True
except ImportError:
DamageAvatar = None
get_damage_data = None
cal_damage_info = None
msgspec_convert = None
STARRAIL_ARTIFACT_FUNCTION_AVAILABLE = False
if TYPE_CHECKING:
from telegram.ext import ContextTypes
from telegram import Update
from starrailrelicscore.models.relic_scorer import Score, TotalScore
try:
import ujson as jsonlib
except ImportError:
import json as jsonlib
DEP_MSG = "自 2.0 版本开始,不再推荐使用此功能,推荐使用 /role_detail 查询角色信息。"
class PlayerCards(Plugin):
def __init__(
self,
player_service: PlayersService,
template_service: TemplateService,
assets_service: AssetsService,
wiki_service: WikiService,
redis: RedisDB,
):
self.player_service = player_service
self.client = PlayerCardsClient(redis)
self.cache = self.client.cache
self.assets_service = assets_service
self.template_service = template_service
self.wiki_service = wiki_service
self.kitsune: Optional[str] = None
self.fight_prop_rule: Dict[str, Dict[str, float]] = {}
async def initialize(self):
await self.client.async_init()
await self._refresh()
async def _refresh(self):
self.fight_prop_rule = await Remote.get_fight_prop_rule_data()
async def _load_history(self, uid) -> Optional[PlayerInfo]:
data = await self.client.player_cards_file.load_history_info(uid)
if data is None:
return None
return PlayerInfo.parse_obj(data)
async def get_uid_and_ch(
self, user_id: int, args: List[str], reply: Optional[Message]
) -> Tuple[Optional[int], Optional[str]]:
"""通过消息获取 uid优先级args > reply > self"""
uid, ch_name, user_id_ = None, None, user_id
if args:
for i in args:
if i is not None:
if i.isdigit() and len(i) == 9:
uid = int(i)
else:
ch_name = roleToName(i)
if reply:
try:
user_id_ = reply.from_user.id
except AttributeError:
pass
if not uid:
player_info = await self.player_service.get_player(user_id_)
if player_info is not None:
uid = player_info.player_id
if (not uid) and (user_id_ != user_id):
player_info = await self.player_service.get_player(user_id)
if player_info is not None:
uid = player_info.player_id
return uid, ch_name
def get_caption(self, character: "Avatar") -> str:
tags = [idToRole(character.avatarId), f"等级{character.level}", f"命座{character.rank}"]
if equip := character.equipment:
if weapon_detail := self.wiki_service.light_cone.get_by_id(equip.tid):
tags.append(weapon_detail.name)
tags.append(f"武器等级{equip.level}")
tags.append(f"{equip.rank}")
return "#" + " #".join(tags)
@handler.command(command="player_card", player=True, block=False)
@handler.message(filters=filters.Regex("^角色卡片查询(.*)"), player=True, block=False)
async def player_cards(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None:
user_id = await self.get_real_user_id(update)
message = update.effective_message
args = self.get_args(context)
await message.reply_chat_action(ChatAction.TYPING)
uid, ch_name = await self.get_uid_and_ch(user_id, args, message.reply_to_message)
if uid is None:
raise PlayerNotFoundError(user_id)
data = await self._load_history(uid)
if data is None or len(data.avatarList) == 0:
if isinstance(self.kitsune, str):
photo = self.kitsune
else:
photo = open("resources/img/aaa.jpg", "rb")
buttons = [
[
InlineKeyboardButton(
"更新面板",
callback_data=f"update_player_card|{user_id}|{uid}",
)
]
]
reply_message = await message.reply_photo(
photo=photo,
caption=f"角色列表未找到,请尝试点击下方按钮更新角色列表 - UID {uid}\n\n{DEP_MSG}",
reply_markup=InlineKeyboardMarkup(buttons),
)
if reply_message.photo:
self.kitsune = reply_message.photo[-1].file_id
return
if ch_name is not None:
self.log_user(
update,
logger.info,
"角色卡片查询命令请求 || character_name[%s] uid[%s]",
ch_name,
uid,
)
else:
self.log_user(update, logger.info, "角色卡片查询命令请求")
ttl = await self.cache.ttl(uid)
buttons = self.gen_button(data, user_id, uid, update_button=ttl < 0)
if isinstance(self.kitsune, str):
photo = self.kitsune
else:
photo = open("resources/img/aaa.jpg", "rb")
reply_message = await message.reply_photo(
photo=photo,
caption=f"请选择你要查询的角色 - UID {uid}\n\n{DEP_MSG}",
reply_markup=InlineKeyboardMarkup(buttons),
)
if reply_message.photo:
self.kitsune = reply_message.photo[-1].file_id
return
for characters in data.avatarList:
if idToRole(characters.avatarId) == ch_name:
break
else:
await message.reply_text(
f"角色展柜中未找到 {ch_name} ,请检查角色是否存在于角色展柜中,或者等待角色数据更新后重试\n\n{DEP_MSG}"
)
return
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
render_result = await RenderTemplate(
uid,
characters,
self.template_service,
self.assets_service,
self.wiki_service,
self.client,
self.fight_prop_rule,
).render() # pylint: disable=W0631
await render_result.reply_photo(
message,
filename=f"player_card_{uid}_{ch_name}.png",
caption=self.get_caption(characters),
)
@handler.callback_query(pattern=r"^update_player_card\|", block=False)
async def update_player_card(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
user = update.effective_user
message = update.effective_message
callback_query = update.callback_query
async def get_player_card_callback(callback_query_data: str) -> Tuple[int, int]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_uid = int(_data[2])
logger.debug("callback_query_data函数返回 user_id[%s] uid[%s]", _user_id, _uid)
return _user_id, _uid
user_id, uid = await get_player_card_callback(callback_query.data)
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
ttl = await self.cache.ttl(uid)
if ttl > 0:
await callback_query.answer(text=f"请等待 {ttl} 秒后再更新", show_alert=True)
return
await message.reply_chat_action(ChatAction.TYPING)
await callback_query.answer(text="正在获取角色列表 请不要重复点击按钮")
data = await self.client.update_data(str(uid))
if isinstance(data, str):
await callback_query.answer(text=data, show_alert=True)
return
if data.avatarList is None:
await message.delete()
await callback_query.answer(
"请先将角色加入到角色展柜并允许查看角色详情后再使用此功能,如果已经添加了角色,请等待角色数据更新后重试",
show_alert=True,
)
return
buttons = self.gen_button(data, user.id, uid, update_button=False)
render_data = await self.parse_holder_data(data)
holder = await self.template_service.render(
"starrail/player_card/holder.html",
render_data,
viewport={"width": 750, "height": 380},
ttl=60 * 10,
caption=f"更新角色列表成功,请选择你要查询的角色 - UID {uid}\n\n{DEP_MSG}",
)
await holder.edit_media(message, reply_markup=InlineKeyboardMarkup(buttons))
@handler.callback_query(pattern=r"^get_player_card\|", block=False)
async def get_player_cards(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
callback_query = update.callback_query
user = callback_query.from_user
message = callback_query.message
async def get_player_card_callback(
callback_query_data: str,
) -> Tuple[str, int, int]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_uid = int(_data[2])
_result = _data[3]
logger.debug(
"callback_query_data函数返回 result[%s] user_id[%s] uid[%s]",
_result,
_user_id,
_uid,
)
return _result, _user_id, _uid
result, user_id, uid = await get_player_card_callback(callback_query.data)
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
if result == "empty_data":
await callback_query.answer(text="此按钮不可用", show_alert=True)
return
page = 0
if result.isdigit():
page = int(result)
logger.info(
"用户 %s[%s] 角色卡片查询命令请求 || page[%s] uid[%s]",
user.full_name,
user.id,
page,
uid,
)
else:
logger.info(
"用户 %s[%s] 角色卡片查询命令请求 || character_name[%s] uid[%s]",
user.full_name,
user.id,
result,
uid,
)
data = await self._load_history(uid)
if isinstance(data, str):
await message.reply_text(data)
return
if data.avatarList is None:
await message.delete()
await callback_query.answer(
"请先将角色加入到角色展柜并允许查看角色详情后再使用此功能,如果已经添加了角色,请等待角色数据更新后重试",
show_alert=True,
)
return
if page:
buttons = self.gen_button(data, user.id, uid, page, await self.cache.ttl(uid) <= 0)
await message.edit_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer(f"已切换到第 {page}", show_alert=False)
return
for characters in data.avatarList:
if idToRole(characters.avatarId) == result:
break
else:
await message.delete()
await callback_query.answer(
f"角色展柜中未找到 {result} ,请检查角色是否存在于角色展柜中,或者等待角色数据更新后重试",
show_alert=True,
)
return
await callback_query.answer(text="正在渲染图片中 请稍等 请不要重复点击按钮", show_alert=False)
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
render_result = await RenderTemplate(
uid,
characters,
self.template_service,
self.assets_service,
self.wiki_service,
self.client,
self.fight_prop_rule,
).render() # pylint: disable=W0631
render_result.filename = f"player_card_{uid}_{result}.png"
render_result.caption = self.get_caption(characters)
await render_result.edit_media(message)
@staticmethod
def gen_button(
data: PlayerInfo,
user_id: Union[str, int],
uid: int,
page: int = 1,
update_button: bool = True,
) -> List[List[InlineKeyboardButton]]:
"""生成按钮"""
buttons = []
if data.avatarList:
buttons = [
InlineKeyboardButton(
idToRole(value.avatarId),
callback_data=f"get_player_card|{user_id}|{uid}|{idToRole(value.avatarId)}",
)
for value in data.avatarList
if value.avatarId
]
all_buttons = [buttons[i : i + 4] for i in range(0, len(buttons), 4)]
send_buttons = all_buttons[(page - 1) * 3 : page * 3]
last_page = page - 1 if page > 1 else 0
all_page = math.ceil(len(all_buttons) / 3)
next_page = page + 1 if page < all_page and all_page > 1 else 0
last_button = []
if last_page:
last_button.append(
InlineKeyboardButton(
"<< 上一页",
callback_data=f"get_player_card|{user_id}|{uid}|{last_page}",
)
)
if last_page or next_page:
last_button.append(
InlineKeyboardButton(
f"{page}/{all_page}",
callback_data=f"get_player_card|{user_id}|{uid}|empty_data",
)
)
if update_button:
last_button.append(
InlineKeyboardButton(
"更新面板",
callback_data=f"update_player_card|{user_id}|{uid}",
)
)
if next_page:
last_button.append(
InlineKeyboardButton(
"下一页 >>",
callback_data=f"get_player_card|{user_id}|{uid}|{next_page}",
)
)
if last_button:
send_buttons.append(last_button)
return send_buttons
async def parse_holder_data(self, data: PlayerInfo) -> dict:
"""
生成渲染所需数据
"""
characters_data = []
for idx, character in enumerate(data.avatarList):
cid = character.avatarId
try:
characters_data.append(
{
"level": character.level,
"constellation": character.rank,
"icon": self.assets_service.avatar.square(cid).as_uri(),
}
)
except AssetsCouldNotFound:
logger.warning("角色 %s 的头像资源获取失败", cid)
if idx == 7:
break
return {
"uid": mask_number(data.uid),
"level": data.level or 0,
"signature": data.signature or "",
"characters": characters_data,
}
class Artifact(BaseModel, frozen=False):
tid: int = 0
# ID
equipment: Dict = {}
# 圣遗物评分
score: float = 0
# 圣遗物评级
score_label: str = "E"
# 圣遗物评级颜色
score_class: str = ""
# 副词条分数
substat_scores: List[float] = []
def set_score(self, result: "Score"):
self.score = result.score
self.score_label = result.rating
self.score_class = self.get_score_class(result.rating)
self.substat_scores = result.sub_stat_score
@staticmethod
def get_score_class(label: str) -> str:
mapping = {
"F": "text-neutral-400",
"D": "text-neutral-200",
"C": "text-violet-400",
"B": "text-violet-400",
"A": "text-yellow-400",
"S": "text-yellow-400",
"SS": "text-yellow-400",
"SSS": "text-red-500",
"WTF": "text-red-500",
}
return mapping.get(label.replace("+", ""), "text-neutral-400")
class RenderTemplate:
def __init__(
self,
uid: Union[int, str],
character: Avatar,
template_service: TemplateService,
assets_service: AssetsService,
wiki_service: WikiService,
client: PlayerCardsClient,
fight_prop_rule: Dict[str, Dict[str, float]],
):
self.uid = uid
self.template_service = template_service
self.character = character
self.assets_service = assets_service
self.wiki_service = wiki_service
self.client = client
self.fight_prop_rule = fight_prop_rule
async def render(self):
images = await self.cache_images()
score = self.cal_avatar_relic_score()
artifact_total_score: float = 0
artifacts = self.find_artifacts()
if score and score.relics:
relic_map = {relic.tid: relic for relic in artifacts}
for relic in score.relics:
artifact = relic_map.get(relic.tid)
if artifact:
artifact.set_score(relic)
artifact_total_score = score.total_score
artifact_total_score = round(artifact_total_score, 1)
artifact_total_score_label: str = score.total_rating
weapon = None
weapon_detail = None
if self.character.equipment and self.character.equipment.tid:
weapon = self.character.equipment
weapon_detail = self.wiki_service.light_cone.get_by_id(self.character.equipment.tid)
skills = [0, 0, 0, 0, 0]
for index in range(5):
skills[index] = self.character.skillTreeList[index].level
data = {
"uid": mask_number(self.uid),
"character": self.character,
"character_detail": self.wiki_service.character.get_by_id(self.character.avatarId),
"weapon": weapon,
"weapon_detail": weapon_detail,
# 圣遗物评分
"artifact_total_score": artifact_total_score,
# 圣遗物评级
"artifact_total_score_label": artifact_total_score_label,
# 圣遗物评级颜色
"artifact_total_score_class": Artifact.get_score_class(artifact_total_score_label),
"artifacts": artifacts,
"skills": skills,
"images": images,
**(await self.cal_avatar_damage()),
}
return await self.template_service.render(
"starrail/player_card/player_card.html",
data,
{"width": 1000, "height": 1200},
full_page=True,
query_selector=".text-neutral-200",
ttl=7 * 24 * 60 * 60,
)
async def cache_images(self):
c = self.character
cid = c.avatarId
data = {
"banner_url": self.assets_service.avatar.gacha(cid).as_uri(),
"skills": [i.as_uri() for i in self.assets_service.avatar.skills(cid)][:-1],
"constellations": [i.as_uri() for i in self.assets_service.avatar.eidolons(cid)],
"equipment": "",
}
if c.equipment and c.equipment.tid:
data["equipment"] = self.assets_service.light_cone.icon(c.equipment.tid).as_uri()
return data
def find_artifacts(self) -> List[Artifact]:
"""在 equipments 数组中找到圣遗物,并转换成带有分数的 model。equipments 数组包含圣遗物和武器"""
def fix_relic(e: Relic) -> Dict:
rid = e.tid
affix = self.client.get_affix_by_id(rid)
relic_set = self.wiki_service.relic.get_by_id(affix.set_id)
try:
icon = relic_set.image_list[affix.type.num]
except IndexError:
icon = relic_set.icon
return {
"id": rid,
"name": relic_set.name,
"icon": icon,
"level": e.level,
"rank": affix.rarity,
"main_sub": self.client.get_affix(e, True, False)[0],
"sub": self.client.get_affix(e, False, True),
}
relic_list = self.character.relicList or []
return [
Artifact(
tid=e.tid,
equipment=fix_relic(e),
)
for e in relic_list
if self.client.get_affix_by_id(e.tid) is not None
]
def cal_avatar_relic_score(self) -> Optional["TotalScore"]:
try:
return CharacterClient.score_character(self.character)
except Exception:
logger.warning("计算角色圣遗物分数时出现错误 uid[%s] avatar[%s]", self.uid, self.character.avatarId)
return None
async def cal_avatar_damage(self) -> Dict:
if not STARRAIL_ARTIFACT_FUNCTION_AVAILABLE:
return {
"damage_function_available": False,
}
try:
data = self.character.dict()
if "property" in data:
del data["property"]
avatar = msgspec_convert(data, type=DamageAvatar)
damage_data = await get_damage_data(avatar, "", str(self.uid))
damage_info = await cal_damage_info(damage_data[0])
return {
"damage_function_available": True,
"damage_info": damage_info,
}
except Exception:
logger.warning("计算角色伤害时出现错误 uid[%s] avatar[%s]", self.uid, self.character.avatarId)
return {
"damage_function_available": False,
}