"""危局强袭战数据查询"""
import math
from functools import lru_cache, partial
from typing import List, Optional, Tuple, TYPE_CHECKING
from simnet.models.zzz.chronicle.challenge_mem import ZZZChallengeMem
from telegram import Message, Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction, ParseMode
from telegram.ext import CallbackContext, filters, ContextTypes
from core.dependence.assets import AssetsService
from core.plugin import Plugin, handler
from core.services.cookies.error import TooManyRequestPublicCookies
from core.services.history_data.models import HistoryDataChallengeMem
from core.services.history_data.services import HistoryDataChallengeMemServices
from core.services.template.models import RenderResult
from core.services.template.services import TemplateService
from gram_core.config import config
from gram_core.dependence.redisdb import RedisDB
from gram_core.plugin.methods.inline_use_data import IInlineUseData
from plugins.tools.genshin import GenshinHelper
from utils.enkanetwork import RedisCache
from utils.log import logger
from utils.uid import mask_number
try:
import ujson as jsonlib
except ImportError:
import json as jsonlib
if TYPE_CHECKING:
from simnet import ZZZClient
cmd_pattern = r"(?i)^/challenge_mem(?:@[\w]+)?\s*((?:\d+)|(?:all))?\s*(pre)?"
msg_pattern = r"^危局强袭战数据((?:查询)|(?:总览))(上期)?\D?(\d*)?.*?$"
MAX_FLOOR = 3
MAX_STARS = MAX_FLOOR * 3
@lru_cache
def get_args(text: str) -> bool:
prev = "pre" in text or "上期" in text
return prev
class AbyssUnlocked(Exception):
"""根本没动"""
class AbyssFastPassed(Exception):
"""快速通过,无数据"""
class ChallengeMemPlugin(Plugin):
"""危局强袭战数据查询"""
def __init__(
self,
template: TemplateService,
helper: GenshinHelper,
assets_service: AssetsService,
history_data_abyss: HistoryDataChallengeMemServices,
redis: RedisDB,
):
self.template_service = template
self.helper = helper
self.assets_service = assets_service
self.history_data_abyss = history_data_abyss
self.cache = RedisCache(redis.client, key="plugin:challenge_mem:history")
async def get_uid(self, user_id: int, reply: Optional[Message], player_id: int, offset: int) -> int:
"""通过消息获取 uid,优先级:args > reply > self"""
uid, user_id_ = player_id, user_id
if reply:
try:
user_id_ = reply.from_user.id
except AttributeError:
pass
if not uid:
player_info = await self.helper.players_service.get_player(user_id_, offset=offset)
if player_info is not None:
uid = player_info.player_id
if (not uid) and (user_id_ != user_id):
player_info = await self.helper.players_service.get_player(user_id, offset=offset)
if player_info is not None:
uid = player_info.player_id
return uid
@handler.command("challenge_mem", block=False)
@handler.message(filters.Regex(msg_pattern), block=False)
async def command_start(self, update: Update, context: CallbackContext) -> None:
user_id = await self.get_real_user_id(update)
uid, offset = self.get_real_uid_or_offset(update)
args = self.get_args(context)
message = update.effective_message
uid: int = await self.get_uid(user_id, message.reply_to_message, uid, offset)
# 若查询帮助
if (message.text.startswith("/") and "help" in message.text) or "帮助" in message.text:
await message.reply_text(
"危局强袭战数据功能使用帮助(中括号表示可选参数)\n\n"
"指令格式:\n/challenge_mem + [层数/all] + [pre]
\n(pre
表示上期)\n\n"
"文本格式:\n危局强袭战数据 + 查询/总览 + [上期] + [层数]
\n\n"
"例如以下指令都正确:\n"
"/challenge_mem
\n/challenge 1 pre
\n/challenge all pre
\n"
"危局强袭战数据查询
\n危局强袭战数据查询上期第1层
\n危局强袭战数据总览上期
",
parse_mode=ParseMode.HTML,
)
self.log_user(update, logger.info, "查询[bold]危局强袭战数据[/bold]帮助", extra={"markup": True})
return
# 解析参数
previous = get_args(" ".join([i for i in args if not i.startswith("@")]))
self.log_user(
update,
logger.info,
"[bold]危局强袭战挑战数据[/bold]请求: uid=%s previous=%s",
uid,
previous,
extra={"markup": True},
)
async def reply_message_func(content: str) -> None:
_reply_msg = await message.reply_text(f"绳匠 ({uid}
) {content}", parse_mode=ParseMode.HTML)
reply_text: Optional[Message] = None
try:
async with self.helper.genshin_or_public(user_id, uid=uid) as client:
reply_text = await message.reply_text(
f"{config.notice.bot_name} 需要时间整理危局强袭战数据,还请耐心等待哦~"
)
await message.reply_chat_action(ChatAction.TYPING)
abyss_data = await self.get_rendered_pic_data(client, uid, previous)
images = await self.get_rendered_pic(abyss_data, uid)
except TooManyRequestPublicCookies:
reply_message = await message.reply_text("查询次数太多,请您稍后重试")
if filters.ChatType.GROUPS.filter(message):
self.add_delete_message_job(reply_message)
self.add_delete_message_job(message)
return
except AbyssUnlocked: # 若危局强袭战未解锁
await reply_message_func("还未解锁危局强袭战哦~")
return
except AbyssFastPassed: # 若危局强袭战已快速通过
await reply_message_func("本层已被快速通过,无详细数据~")
return
except IndexError: # 若危局强袭战为挑战此层
await reply_message_func("还没有挑战本层呢,咕咕咕~")
return
except ValueError as e:
if uid:
await reply_message_func("UID 输入错误,请重新输入")
return
raise e
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
await images.reply_photo(message)
if reply_text is not None:
await reply_text.delete()
self.log_user(update, logger.info, "[bold]危局强袭战挑战数据[/bold]: 成功发送图片", extra={"markup": True})
async def get_rendered_pic_data(self, client: "ZZZClient", uid: int, previous: bool) -> "ZZZChallengeMem":
abyss_data = await client.get_zzz_challenge_mem(uid, previous=previous, lang="zh-cn")
if abyss_data.has_data and abyss_data.list:
await self.save_abyss_data(self.history_data_abyss, uid, abyss_data)
return abyss_data
@staticmethod
def from_seconds_to_hours(seconds: int) -> str:
hours = seconds / 3600
minutes = (seconds % 3600) / 60
sec = seconds % 60
return f"{int(hours)}时{int(minutes)}分{int(sec)}秒"
async def get_rendered_pic( # skipcq: PY-R1000 #
self,
abyss_data: "ZZZChallengeMem",
uid: int,
) -> RenderResult:
"""
获取渲染后的图片
Args:
abyss_data (ZZZChallenge): 危局强袭战数据
uid (int): 需要查询的 uid
Returns:
bytes格式的图片
"""
if not abyss_data.has_data or not abyss_data.list:
raise AbyssUnlocked()
start_time = abyss_data.begin_time.datetime.strftime("%m月%d日 %H:%M")
end_time = abyss_data.end_time.datetime.strftime("%m月%d日 %H:%M")
render_data = {
"title": "危局强袭战",
"start_time": start_time,
"end_time": end_time,
"stars": abyss_data.total_star,
"scores": abyss_data.total_score,
"uid": mask_number(uid),
"abyss_data": abyss_data,
}
floors = []
character_icons = {}
buddy_icons = {}
for floor_data in abyss_data.list:
for ch in floor_data.avatar_list:
character_icons[ch.id] = self.assets_service.avatar.square(ch.id).as_uri()
if bu := floor_data.buddy:
buddy_icons[bu.id] = self.assets_service.buddy.icon(bu.id).as_uri()
floors.append(
{
"floor": floor_data,
"floor_time": floor_data.challenge_time.datetime.strftime("%Y-%m-%d %H:%M:%S"),
}
)
render_data["character_icons"] = character_icons
render_data["buddy_icons"] = buddy_icons
render_data["floors"] = floors[::-1]
return await self.template_service.render(
"zzz/abyss/mem.jinja2",
render_data,
viewport={"width": 1162, "height": 4000},
query_selector=".container",
)
@staticmethod
async def save_abyss_data(
history_data_abyss: "HistoryDataChallengeMemServices", uid: int, abyss_data: "ZZZChallengeMem"
) -> bool:
model = history_data_abyss.create(uid, abyss_data)
old_data = await history_data_abyss.get_by_user_id_data_id(uid, model.data_id)
exists = history_data_abyss.exists_data(model, old_data)
if not exists:
await history_data_abyss.add(model)
return True
return False
async def get_abyss_data(self, uid: int):
return await self.history_data_abyss.get_by_user_id(uid)
@staticmethod
def get_season_data_name(data: "HistoryDataChallengeMem"):
last_battles = data.abyss_data.list[0]
start_time = last_battles.challenge_time.datetime
time = start_time.strftime("%Y.%m.%d")
name = ""
honor = ""
if data.abyss_data.total_star == MAX_STARS:
honor = "👑"
num_of_characters = len(last_battles.avatar_list)
if num_of_characters == 2:
honor = "双通"
elif num_of_characters == 1:
honor = "单通"
return f"{name} {time} {data.abyss_data.total_star} ★ {honor}".strip()
async def get_session_button_data(self, user_id: int, uid: int, force: bool = False):
redis = await self.cache.get(str(uid))
if redis and not force:
return redis["buttons"]
data = await self.get_abyss_data(uid)
data.sort(key=lambda x: x.id, reverse=True)
abyss_data = [HistoryDataChallengeMem.from_data(i) for i in data]
buttons = [
{
"name": self.get_season_data_name(abyss_data[idx]),
"value": f"get_challenge_mem_history|{user_id}|{uid}|{value.id}",
}
for idx, value in enumerate(data)
]
await self.cache.set(str(uid), {"buttons": buttons})
return buttons
async def gen_season_button(
self,
user_id: int,
uid: int,
page: int = 1,
) -> List[List[InlineKeyboardButton]]:
"""生成按钮"""
data = await self.get_session_button_data(user_id, uid)
if not data:
return []
buttons = [
InlineKeyboardButton(
value["name"],
callback_data=value["value"],
)
for value in data
]
all_buttons = [buttons[i : i + 2] for i in range(0, len(buttons), 2)]
send_buttons = all_buttons[(page - 1) * 7 : page * 7]
last_page = page - 1 if page > 1 else 0
all_page = math.ceil(len(all_buttons) / 7)
next_page = page + 1 if page < all_page and all_page > 1 else 0
last_button = []
if last_page:
last_button.append(
InlineKeyboardButton(
"<< 上一页",
callback_data=f"get_challenge_mem_history|{user_id}|{uid}|p_{last_page}",
)
)
if last_page or next_page:
last_button.append(
InlineKeyboardButton(
f"{page}/{all_page}",
callback_data=f"get_challenge_mem_history|{user_id}|{uid}|empty_data",
)
)
if next_page:
last_button.append(
InlineKeyboardButton(
"下一页 >>",
callback_data=f"get_challenge_mem_history|{user_id}|{uid}|p_{next_page}",
)
)
if last_button:
send_buttons.append(last_button)
return send_buttons
@handler.command("challenge_mem_history", block=False)
@handler.message(filters.Regex(r"^危局强袭战历史数据"), block=False)
async def abyss_history_command_start(self, update: Update, _: CallbackContext) -> None:
user_id = await self.get_real_user_id(update)
message = update.effective_message
uid, offset = self.get_real_uid_or_offset(update)
uid: int = await self.get_uid(user_id, message.reply_to_message, uid, offset)
self.log_user(update, logger.info, "查询危局强袭战历史数据 uid[%s]", uid)
async with self.helper.genshin_or_public(user_id, uid=uid) as _:
await self.get_session_button_data(user_id, uid, force=True)
buttons = await self.gen_season_button(user_id, uid)
if not buttons:
await message.reply_text("还没有危局强袭战历史数据哦~")
return
await message.reply_text("请选择要查询的危局强袭战历史数据", reply_markup=InlineKeyboardMarkup(buttons))
async def get_challenge_mem_history_page(self, update: "Update", user_id: int, uid: int, result: str):
"""翻页处理"""
callback_query = update.callback_query
self.log_user(update, logger.info, "切换危局强袭战历史数据页 page[%s]", result)
page = int(result.split("_")[1])
async with self.helper.genshin_or_public(user_id) as _:
buttons = await self.gen_season_button(user_id, uid, page)
if not buttons:
await callback_query.answer("还没有危局强袭战历史数据哦~", show_alert=True)
await callback_query.edit_message_text("还没有危局强袭战历史数据哦~")
return
await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer(f"已切换到第 {page} 页", show_alert=False)
async def get_challenge_mem_history_floor(self, update: "Update", data_id: int):
"""渲染层数数据"""
callback_query = update.callback_query
message = callback_query.message
reply = None
if message.reply_to_message:
reply = message.reply_to_message
data = await self.history_data_abyss.get_by_id(data_id)
if not data:
await callback_query.answer("数据不存在,请尝试重新发送命令", show_alert=True)
await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~")
return
abyss_data = HistoryDataChallengeMem.from_data(data)
await callback_query.answer("正在渲染图片中 请稍等 请不要重复点击按钮", show_alert=False)
images = await self.get_rendered_pic(abyss_data.abyss_data, data.user_id)
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
await images.reply_photo(reply or message)
self.log_user(update, logger.info, "[bold]危局强袭战挑战数据[/bold]: 成功发送图片", extra={"markup": True})
self.add_delete_message_job(message, delay=1)
@handler.callback_query(pattern=r"^get_challenge_mem_history\|", block=False)
async def get_challenge_mem_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
callback_query = update.callback_query
user = callback_query.from_user
async def get_challenge_mem_history_callback(
callback_query_data: str,
) -> Tuple[str, int, int]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_uid = int(_data[2])
_result = _data[3]
logger.debug(
"callback_query_data函数返回 result[%s] user_id[%s] uid[%s]",
_result,
_user_id,
_uid,
)
return _result, _user_id, _uid
result, user_id, uid = await get_challenge_mem_history_callback(callback_query.data)
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
if result == "empty_data":
await callback_query.answer(text="此按钮不可用", show_alert=True)
return
if result.startswith("p_"):
await self.get_challenge_mem_history_page(update, user_id, uid, result)
return
data_id = int(result)
await self.get_challenge_mem_history_floor(update, data_id)
async def abyss_use_by_inline(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE", previous: bool):
callback_query = update.callback_query
user = update.effective_user
user_id = user.id
uid = IInlineUseData.get_uid_from_context(context)
self.log_user(update, logger.info, "查询危局强袭战挑战总览数据 previous[%s]", previous)
notice = None
try:
async with self.helper.genshin_or_public(user_id, uid=uid) as client:
if not client.public:
await client.get_record_cards()
abyss_data = await self.get_rendered_pic_data(client, uid, previous)
image = await self.get_rendered_pic(abyss_data, uid)
except AbyssUnlocked: # 若深渊未解锁
notice = "还未解锁危局强袭战哦~"
except TooManyRequestPublicCookies:
notice = "查询次数太多,请您稍后重试"
if notice:
await callback_query.answer(notice, show_alert=True)
return
await image.edit_inline_media(callback_query)
async def get_inline_use_data(self) -> List[Optional[IInlineUseData]]:
return [
IInlineUseData(
text="本期危局强袭战总览",
hash="challenge_mem_current",
callback=partial(self.abyss_use_by_inline, previous=False),
player=True,
),
IInlineUseData(
text="上期危局强袭战总览",
hash="challenge_mem_previous",
callback=partial(self.abyss_use_by_inline, previous=True),
player=True,
),
]