Support starrail challenge boss

This commit is contained in:
xtaodada 2024-06-19 23:23:45 +08:00
parent 818ae227d3
commit 01e4a2ee87
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
10 changed files with 885 additions and 5 deletions

View File

@ -2,6 +2,7 @@ import enum
from pydantic import BaseModel from pydantic import BaseModel
from simnet.models.starrail.chronicle.challenge import StarRailChallenge from simnet.models.starrail.chronicle.challenge import StarRailChallenge
from simnet.models.starrail.chronicle.challenge_boss import StarRailChallengeBoss, StarRailChallengeBossGroup
from simnet.models.starrail.chronicle.challenge_story import StarRailChallengeStory, StarRailChallengeStoryGroup from simnet.models.starrail.chronicle.challenge_story import StarRailChallengeStory, StarRailChallengeStoryGroup
from simnet.models.starrail.diary import StarRailDiary from simnet.models.starrail.diary import StarRailDiary
@ -12,6 +13,7 @@ __all__ = (
"HistoryDataTypeEnum", "HistoryDataTypeEnum",
"HistoryDataAbyss", "HistoryDataAbyss",
"HistoryDataChallengeStory", "HistoryDataChallengeStory",
"HistoryDataChallengeBoss",
"HistoryDataLedger", "HistoryDataLedger",
) )
@ -20,6 +22,7 @@ class HistoryDataTypeEnum(int, enum.Enum):
ABYSS = 0 # 混沌回忆 ABYSS = 0 # 混沌回忆
CHALLENGE_STORY = 1 # 虚构叙事 CHALLENGE_STORY = 1 # 虚构叙事
LEDGER = 2 # 开拓月历 LEDGER = 2 # 开拓月历
CHALLENGE_BOSS = 3 # 末日幻影
class HistoryDataAbyss(BaseModel): class HistoryDataAbyss(BaseModel):
@ -39,6 +42,15 @@ class HistoryDataChallengeStory(BaseModel):
return cls.parse_obj(data.data) return cls.parse_obj(data.data)
class HistoryDataChallengeBoss(BaseModel):
boss_data: StarRailChallengeBoss
group: StarRailChallengeBossGroup
@classmethod
def from_data(cls, data: HistoryData) -> "HistoryDataChallengeBoss":
return cls.parse_obj(data.data)
class HistoryDataLedger(BaseModel): class HistoryDataLedger(BaseModel):
diary_data: StarRailDiary diary_data: StarRailDiary

View File

@ -3,6 +3,7 @@ from typing import List
from pytz import timezone from pytz import timezone
from simnet.models.starrail.chronicle.challenge import StarRailChallenge from simnet.models.starrail.chronicle.challenge import StarRailChallenge
from simnet.models.starrail.chronicle.challenge_boss import StarRailChallengeBoss, StarRailChallengeBossGroup
from simnet.models.starrail.chronicle.challenge_story import StarRailChallengeStory, StarRailChallengeStoryGroup from simnet.models.starrail.chronicle.challenge_story import StarRailChallengeStory, StarRailChallengeStoryGroup
from simnet.models.starrail.diary import StarRailDiary from simnet.models.starrail.diary import StarRailDiary
@ -12,6 +13,7 @@ from core.services.history_data.models import (
HistoryDataAbyss, HistoryDataAbyss,
HistoryDataChallengeStory, HistoryDataChallengeStory,
HistoryDataLedger, HistoryDataLedger,
HistoryDataChallengeBoss,
) )
from gram_core.base_service import BaseService from gram_core.base_service import BaseService
from gram_core.services.history_data.services import HistoryDataBaseServices from gram_core.services.history_data.services import HistoryDataBaseServices
@ -26,6 +28,7 @@ __all__ = (
"HistoryDataBaseServices", "HistoryDataBaseServices",
"HistoryDataAbyssServices", "HistoryDataAbyssServices",
"HistoryDataChallengeStoryServices", "HistoryDataChallengeStoryServices",
"HistoryDataChallengeBossServices",
"HistoryDataLedgerServices", "HistoryDataLedgerServices",
) )
@ -82,6 +85,38 @@ class HistoryDataChallengeStoryServices(BaseService, HistoryDataBaseServices):
) )
class HistoryDataChallengeBossServices(BaseService, HistoryDataBaseServices):
DATA_TYPE = HistoryDataTypeEnum.CHALLENGE_BOSS.value
@staticmethod
def exists_data(data: HistoryData, old_data: List[HistoryData]) -> bool:
def _get_data(_data: HistoryData):
detail = _data.data.get("boss_data", {}).get("all_floor_detail")
_avatars = []
for floor in detail:
for avatar in floor.get("avatars", []):
_avatars.append(avatar["id"])
return _avatars
avatars = _get_data(data)
return any(_get_data(d) == avatars for d in old_data)
@staticmethod
def create(user_id: int, boss_data: StarRailChallengeBoss, group: StarRailChallengeBossGroup):
data = HistoryDataChallengeBoss(boss_data=boss_data, group=group)
json_data = data.json(by_alias=True, encoder=json_encoder)
dict_data = jsonlib.loads(json_data)
dict_data["boss_data"]["groups"] = []
return HistoryData(
user_id=user_id,
data_id=group.season,
time_created=datetime.datetime.now(),
type=HistoryDataChallengeBossServices.DATA_TYPE,
data=dict_data,
)
class HistoryDataLedgerServices(BaseService, HistoryDataBaseServices): class HistoryDataLedgerServices(BaseService, HistoryDataBaseServices):
DATA_TYPE = HistoryDataTypeEnum.LEDGER.value DATA_TYPE = HistoryDataTypeEnum.LEDGER.value

View File

@ -2187,10 +2187,10 @@ files = [
[[package]] [[package]]
name = "simnet" name = "simnet"
version = "0.1.21" version = "0.1.22"
requires_python = "<4.0,>=3.8" requires_python = "<4.0,>=3.8"
git = "https://github.com/PaiGramTeam/SIMNet" git = "https://github.com/PaiGramTeam/SIMNet"
revision = "0b8ef8e8e35d13b15c13dfaf6ddf28a83c2b72a7" revision = "13a0d8a4f626c22484b3650eef5c177fa2939ac8"
summary = "Modern API wrapper for Genshin Impact & Honkai: Star Rail built on asyncio and pydantic." summary = "Modern API wrapper for Genshin Impact & Honkai: Star Rail built on asyncio and pydantic."
groups = ["default"] groups = ["default"]
dependencies = [ dependencies = [

View File

@ -50,6 +50,8 @@ class SetCommandPlugin(Plugin):
BotCommand("challenge_history", "混沌回忆历史信息查询"), BotCommand("challenge_history", "混沌回忆历史信息查询"),
BotCommand("challenge_story", "虚构叙事信息查询"), BotCommand("challenge_story", "虚构叙事信息查询"),
BotCommand("challenge_story_history", "虚构叙事历史信息查询"), BotCommand("challenge_story_history", "虚构叙事历史信息查询"),
BotCommand("challenge_boss", "末日幻想信息查询"),
BotCommand("challenge_boss_history", "末日幻想历史信息查询"),
BotCommand("rogue", "模拟宇宙信息查询"), BotCommand("rogue", "模拟宇宙信息查询"),
BotCommand("rogue_locust", "寰宇蝗灾信息查询"), BotCommand("rogue_locust", "寰宇蝗灾信息查询"),
BotCommand("cookies_import", "从其他 BOT 导入账号信息"), BotCommand("cookies_import", "从其他 BOT 导入账号信息"),

View File

@ -15,12 +15,14 @@ from core.services.history_data.services import (
HistoryDataAbyssServices, HistoryDataAbyssServices,
HistoryDataLedgerServices, HistoryDataLedgerServices,
HistoryDataChallengeStoryServices, HistoryDataChallengeStoryServices,
HistoryDataChallengeBossServices,
) )
from gram_core.basemodel import RegionEnum from gram_core.basemodel import RegionEnum
from gram_core.plugin import handler from gram_core.plugin import handler
from gram_core.services.cookies import CookiesService from gram_core.services.cookies import CookiesService
from gram_core.services.cookies.models import CookiesStatusEnum from gram_core.services.cookies.models import CookiesStatusEnum
from plugins.starrail.challenge import ChallengePlugin from plugins.starrail.challenge import ChallengePlugin
from plugins.starrail.challenge_boss import ChallengeBossPlugin
from plugins.starrail.challenge_story import ChallengeStoryPlugin from plugins.starrail.challenge_story import ChallengeStoryPlugin
from plugins.starrail.ledger import LedgerPlugin from plugins.starrail.ledger import LedgerPlugin
from plugins.tools.genshin import GenshinHelper, PlayerNotFoundError, CookiesNotFoundError from plugins.tools.genshin import GenshinHelper, PlayerNotFoundError, CookiesNotFoundError
@ -49,12 +51,14 @@ class RefreshHistoryJob(Plugin):
history_abyss: HistoryDataAbyssServices, history_abyss: HistoryDataAbyssServices,
history_data_abyss_story: HistoryDataChallengeStoryServices, history_data_abyss_story: HistoryDataChallengeStoryServices,
history_ledger: HistoryDataLedgerServices, history_ledger: HistoryDataLedgerServices,
history_data_abyss_boss: HistoryDataChallengeBossServices,
): ):
self.cookies = cookies self.cookies = cookies
self.genshin_helper = genshin_helper self.genshin_helper = genshin_helper
self.history_data_abyss = history_abyss self.history_data_abyss = history_abyss
self.history_data_abyss_story = history_data_abyss_story self.history_data_abyss_story = history_data_abyss_story
self.history_data_ledger = history_ledger self.history_data_ledger = history_ledger
self.history_data_abyss_boss = history_data_abyss_boss
@staticmethod @staticmethod
async def send_notice(context: "ContextTypes.DEFAULT_TYPE", user_id: int, notice_text: str): async def send_notice(context: "ContextTypes.DEFAULT_TYPE", user_id: int, notice_text: str):
@ -120,6 +124,19 @@ class RefreshHistoryJob(Plugin):
notice_text = NOTICE_TEXT % ("旅行札记历史记录", now, uid, "旅行札记历史记录") notice_text = NOTICE_TEXT % ("旅行札记历史记录", now, uid, "旅行札记历史记录")
await self.send_notice(context, user_id, notice_text) await self.send_notice(context, user_id, notice_text)
async def save_abyss_boss_data(self, client: "StarRailClient") -> bool:
uid = client.player_id
abyss_data = await client.get_starrail_challenge_boss(uid, previous=False, lang="zh-cn")
if abyss_data.has_data and abyss_data.groups:
group = abyss_data.groups[0]
return await ChallengeBossPlugin.save_abyss_data(self.history_data_abyss_boss, uid, abyss_data, group)
return False
async def send_abyss_boss_notice(self, context: "ContextTypes.DEFAULT_TYPE", user_id: int, uid: int):
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
notice_text = NOTICE_TEXT % ("末日幻影历史记录", now, uid, "挑战记录")
await self.send_notice(context, user_id, notice_text)
@handler.command(command="remove_same_history", block=False, admin=True) @handler.command(command="remove_same_history", block=False, admin=True)
async def remove_same_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE"): async def remove_same_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE"):
user = update.effective_user user = update.effective_user
@ -133,6 +150,8 @@ class RefreshHistoryJob(Plugin):
text += f"虚构叙事数据移除数量:{num3}\n" text += f"虚构叙事数据移除数量:{num3}\n"
num2 = await self.history_data_ledger.remove_same_data() num2 = await self.history_data_ledger.remove_same_data()
text += f"开拓月历数据移除数量:{num2}\n" text += f"开拓月历数据移除数量:{num2}\n"
num4 = await self.history_data_abyss_boss.remove_same_data()
text += f"末日幻影数据移除数量:{num4}\n"
await reply.edit_text(text) await reply.edit_text(text)
@handler.command(command="refresh_all_history", block=False, admin=True) @handler.command(command="refresh_all_history", block=False, admin=True)
@ -160,6 +179,8 @@ class RefreshHistoryJob(Plugin):
await self.send_abyss_story_notice(context, user_id, client.player_id) await self.send_abyss_story_notice(context, user_id, client.player_id)
if await self.save_ledger_data(client): if await self.save_ledger_data(client):
await self.send_ledger_notice(context, user_id, client.player_id) await self.send_ledger_notice(context, user_id, client.player_id)
if await self.save_abyss_boss_data(client):
await self.send_abyss_boss_notice(context, user_id, client.player_id)
except (InvalidCookies, PlayerNotFoundError, CookiesNotFoundError): except (InvalidCookies, PlayerNotFoundError, CookiesNotFoundError):
continue continue
except SimnetBadRequest as exc: except SimnetBadRequest as exc:

View File

@ -0,0 +1,655 @@
"""末日幻影数据查询"""
import asyncio
import math
import re
from functools import lru_cache, partial
from typing import Any, List, Optional, Tuple, Union, TYPE_CHECKING
from arkowrapper import ArkoWrapper
from pytz import timezone
from telegram import Message, Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction, ParseMode
from telegram.ext import CallbackContext, filters, ContextTypes
from core.dependence.assets import AssetsService
from core.plugin import Plugin, handler
from core.services.cookies.error import TooManyRequestPublicCookies
from core.services.history_data.models import HistoryDataChallengeBoss
from core.services.history_data.services import HistoryDataChallengeBossServices
from core.services.template.models import RenderGroupResult, RenderResult
from core.services.template.services import TemplateService
from gram_core.config import config
from gram_core.dependence.redisdb import RedisDB
from gram_core.plugin.methods.inline_use_data import IInlineUseData
from plugins.tools.genshin import GenshinHelper
from utils.enkanetwork import RedisCache
from utils.log import logger
from utils.uid import mask_number
try:
import ujson as jsonlib
except ImportError:
import json as jsonlib
if TYPE_CHECKING:
from simnet import StarRailClient
from simnet.models.starrail.chronicle.challenge_boss import StarRailChallengeBoss, StarRailChallengeBossGroup
TZ = timezone("Asia/Shanghai")
cmd_pattern = r"(?i)^/challenge_boss(?:@[\w]+)?\s*((?:\d+)|(?:all))?\s*(pre)?"
msg_pattern = r"^末日幻影数据((?:查询)|(?:总览))(上期)?\D?(\d*)?.*?$"
MAX_FLOOR = 4
@lru_cache
def get_args(text: str) -> Tuple[int, bool, bool]:
if text.startswith("/"):
result = re.match(cmd_pattern, text).groups()
try:
floor = int(result[0] or 0)
if floor > 100:
floor = 0
except ValueError:
floor = 0
return floor, result[0] == "all", bool(result[1])
result = re.match(msg_pattern, text).groups()
return int(result[2] or 0), result[0] == "总览", result[1] == "上期"
class AbyssUnlocked(Exception):
"""根本没动"""
class AbyssFastPassed(Exception):
"""快速通过,无数据"""
class ChallengeBossPlugin(Plugin):
"""末日幻影数据查询"""
def __init__(
self,
template: TemplateService,
helper: GenshinHelper,
assets_service: AssetsService,
history_data_abyss: HistoryDataChallengeBossServices,
redis: RedisDB,
):
self.template_service = template
self.helper = helper
self.assets_service = assets_service
self.history_data_abyss = history_data_abyss
self.cache = RedisCache(redis.client, key="plugin:challenge_boss:history")
async def get_uid(self, user_id: int, reply: Optional[Message], player_id: int, offset: int) -> int:
"""通过消息获取 uid优先级args > reply > self"""
uid, user_id_ = player_id, user_id
if reply:
try:
user_id_ = reply.from_user.id
except AttributeError:
pass
if not uid:
player_info = await self.helper.players_service.get_player(user_id_, offset=offset)
if player_info is not None:
uid = player_info.player_id
if (not uid) and (user_id_ != user_id):
player_info = await self.helper.players_service.get_player(user_id, offset=offset)
if player_info is not None:
uid = player_info.player_id
return uid
@handler.command("challenge_boss", block=False)
@handler.message(filters.Regex(msg_pattern), block=False)
async def command_start(self, update: Update, _: CallbackContext) -> None:
user_id = await self.get_real_user_id(update)
message = update.effective_message
uid, offset = self.get_real_uid_or_offset(update)
uid: int = await self.get_uid(user_id, message.reply_to_message, uid, offset)
# 若查询帮助
if (message.text.startswith("/") and "help" in message.text) or "帮助" in message.text:
await message.reply_text(
"<b>末日幻影数据</b>功能使用帮助(中括号表示可选参数)\n\n"
"指令格式:\n<code>/challenge_boss + [层数/all] + [pre]</code>\n<code>pre</code>表示上期)\n\n"
"文本格式:\n<code>末日幻影数据 + 查询/总览 + [上期] + [层数]</code> \n\n"
"例如以下指令都正确:\n"
"<code>/challenge_boss</code>\n<code>/challenge_boss 1 pre</code>\n<code>/challenge_boss all pre</code>\n"
"<code>末日幻影数据查询</code>\n<code>末日幻影数据查询上期第1层</code>\n<code>末日幻影数据总览上期</code>",
parse_mode=ParseMode.HTML,
)
self.log_user(update, logger.info, "查询[bold]末日幻影数据[/bold]帮助", extra={"markup": True})
return
# 解析参数
floor, total, previous = get_args(message.text)
if floor > MAX_FLOOR or floor < 0:
reply_msg = await message.reply_text(
f"末日幻影层数输入错误,请重新输入。支持的参数为: 1-{MAX_FLOOR} 或 all"
)
if filters.ChatType.GROUPS.filter(message):
self.add_delete_message_job(reply_msg)
self.add_delete_message_job(message)
return
self.log_user(
update,
logger.info,
"[bold]末日幻影挑战数据[/bold]请求: uid=%s floor=%s total=%s previous=%s",
uid,
floor,
total,
previous,
extra={"markup": True},
)
async def reply_message_func(content: str) -> None:
_reply_msg = await message.reply_text(f"开拓者 (<code>{uid}</code>) {content}", parse_mode=ParseMode.HTML)
reply_text: Optional[Message] = None
try:
async with self.helper.genshin_or_public(user_id, uid=uid) as client:
if total:
reply_text = await message.reply_text("彦卿需要时间整理末日幻影数据,还请耐心等待哦~")
await message.reply_chat_action(ChatAction.TYPING)
abyss_data, season = await self.get_rendered_pic_data(client, uid, previous)
images = await self.get_rendered_pic(abyss_data, season, uid, floor, total)
except TooManyRequestPublicCookies:
reply_message = await message.reply_text("查询次数太多,请您稍后重试")
if filters.ChatType.GROUPS.filter(message):
self.add_delete_message_job(reply_message)
self.add_delete_message_job(message)
return
except AbyssUnlocked: # 若末日幻影未解锁
await reply_message_func("还未解锁末日幻影哦~")
return
except AbyssFastPassed: # 若末日幻影已快速通过
await reply_message_func("本层已被快速通过,无详细数据~")
return
except IndexError: # 若末日幻影为挑战此层
await reply_message_func("还没有挑战本层呢,咕咕咕~")
return
except ValueError as e:
if uid:
await reply_message_func("UID 输入错误,请重新输入")
return
raise e
if images is None:
await reply_message_func(f"还没有第 {floor} 层的挑战数据")
return
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
for group in ArkoWrapper(images).group(10): # 每 10 张图片分一个组
await RenderGroupResult(results=group).reply_media_group(message, write_timeout=60)
if reply_text is not None:
await reply_text.delete()
self.log_user(update, logger.info, "[bold]末日幻影挑战数据[/bold]: 成功发送图片", extra={"markup": True})
@staticmethod
def get_floor_data(abyss_data: "StarRailChallengeBoss", floor: int):
try:
floor_data = abyss_data.floors[-floor]
except IndexError:
floor_data = None
if not floor_data:
raise AbyssUnlocked()
if floor_data.is_fast:
raise AbyssFastPassed()
render_data = {
"floor": floor_data,
"floor_time": floor_data.last_update_time.datetime.astimezone(TZ).strftime("%Y-%m-%d %H:%M:%S"),
"floor_nodes": [floor_data.node_1, floor_data.node_2],
"floor_num": floor,
}
return render_data
async def get_rendered_pic_data(
self, client: "StarRailClient", uid: int, previous: bool
) -> Tuple["StarRailChallengeBoss", "StarRailChallengeBossGroup"]:
abyss_data = await client.get_starrail_challenge_boss(uid, previous=previous, lang="zh-cn")
group = None
if abyss_data.has_data and abyss_data.groups:
if previous:
if len(abyss_data.groups) > 1:
group = abyss_data.groups[1]
else:
group = abyss_data.groups[0]
await self.save_abyss_data(self.history_data_abyss, uid, abyss_data, group)
return abyss_data, group
async def get_rendered_pic(
self,
abyss_data: "StarRailChallengeBoss",
season: "StarRailChallengeBossGroup",
uid: int,
floor: int,
total: bool,
) -> Union[
Tuple[
Union[BaseException, Any],
Union[BaseException, Any],
Union[BaseException, Any],
Union[BaseException, Any],
Union[BaseException, Any],
],
List[RenderResult],
None,
]:
"""
获取渲染后的图片
Args:
abyss_data (StarRailChallengeStory): 末日幻影数据
season (StarRailChallengeStoryGroup): 末日幻影组数据
uid (int): 需要查询的 uid
floor (int): 层数
total (bool): 是否为总览
Returns:
bytes格式的图片
"""
if not abyss_data.has_data:
raise AbyssUnlocked()
if not season:
raise AbyssUnlocked()
start_time = season.begin_time.datetime.astimezone(TZ).strftime("%m月%d%H:%M")
end_time = season.end_time.datetime.astimezone(TZ).strftime("%m月%d%H:%M")
total_stars = f"{abyss_data.total_stars}"
render_data = {
"title": "末日幻影",
"start_time": start_time,
"end_time": end_time,
"stars": total_stars,
"uid": mask_number(uid),
"max_floor": abyss_data.max_floor,
"total_battles": abyss_data.total_battles,
"upper_boss": season.upper_boss,
"lower_boss": season.lower_boss,
"floor_colors": {
1: "#374952",
2: "#374952",
3: "#55464B",
4: "#55464B",
5: "#55464B",
6: "#1D2A5D",
7: "#1D2A5D",
8: "#1D2A5D",
9: "#292B58",
10: "#382024",
11: "#252550",
12: "#1D2A4A",
},
}
if total:
overview = await self.template_service.render(
"starrail/abyss/overview.html", render_data, viewport={"width": 750, "height": 250}
)
def floor_task(floor_index: int):
_abyss_data = self.get_floor_data(abyss_data, floor_index)
return (
floor_index,
self.template_service.render(
"starrail/abyss/floor_boss.html",
{
**render_data,
**_abyss_data,
},
viewport={"width": 690, "height": 500},
full_page=True,
ttl=15 * 24 * 60 * 60,
),
)
render_inputs = []
floors = abyss_data.floors[::-1]
for i in range(len(floors)):
try:
render_inputs.append(floor_task(i + 1))
except AbyssFastPassed:
pass
render_group_inputs = list(map(lambda x: x[1], sorted(render_inputs, key=lambda x: x[0])))
render_group_outputs = await asyncio.gather(*render_group_inputs)
render_group_outputs.insert(0, overview)
return render_group_outputs
if floor < 1:
return [
await self.template_service.render(
"starrail/abyss/overview.html", render_data, viewport={"width": 750, "height": 250}
)
]
try:
floor_data = abyss_data.floors[-floor]
except IndexError:
return None
if not floor_data:
return None
if floor_data.is_fast:
raise AbyssFastPassed()
render_data.update(self.get_floor_data(abyss_data, floor))
return [
await self.template_service.render(
"starrail/abyss/floor_boss.html", render_data, viewport={"width": 690, "height": 500}
)
]
@staticmethod
async def save_abyss_data(
history_data_abyss: "HistoryDataChallengeBossServices",
uid: int,
abyss_data: "StarRailChallengeBoss",
group: "StarRailChallengeBossGroup",
) -> bool:
if not group:
return False
model = history_data_abyss.create(uid, abyss_data, group)
old_data = await history_data_abyss.get_by_user_id_data_id(uid, model.data_id)
exists = history_data_abyss.exists_data(model, old_data)
if not exists:
await history_data_abyss.add(model)
return True
return False
async def get_abyss_data(self, uid: int):
return await self.history_data_abyss.get_by_user_id(uid)
@staticmethod
def get_season_data_name(data: "HistoryDataChallengeBoss"):
last_battles = data.boss_data.floors[0]
start_time = last_battles.last_update_time.datetime.astimezone(TZ)
time = start_time.strftime("%Y.%m.%d")
name = ""
if "" in last_battles.name:
name = last_battles.name.split("")[0]
honor = ""
if data.boss_data.total_stars == 12:
fast_count = len([i for i in data.boss_data.floors if i.is_fast])
if data.boss_data.total_battles == (4 - fast_count):
honor = "👑"
num_of_characters = max(
len(last_battles.node_1.avatars),
len(last_battles.node_2.avatars),
)
if num_of_characters == 2:
honor = "双通"
elif num_of_characters == 1:
honor = "单通"
return f"{name} {time} {data.boss_data.total_stars}{honor}".strip()
async def get_session_button_data(self, user_id: int, uid: int, force: bool = False):
redis = await self.cache.get(str(uid))
if redis and not force:
return redis["buttons"]
data = await self.get_abyss_data(uid)
data.sort(key=lambda x: x.id, reverse=True)
abyss_data = [HistoryDataChallengeBoss.from_data(i) for i in data]
buttons = [
{
"name": self.get_season_data_name(abyss_data[idx]),
"value": f"get_challenge_boss_history|{user_id}|{uid}|{value.id}",
}
for idx, value in enumerate(data)
]
await self.cache.set(str(uid), {"buttons": buttons})
return buttons
async def gen_season_button(
self,
user_id: int,
uid: int,
page: int = 1,
) -> List[List[InlineKeyboardButton]]:
"""生成按钮"""
data = await self.get_session_button_data(user_id, uid)
if not data:
return []
buttons = [
InlineKeyboardButton(
value["name"],
callback_data=value["value"],
)
for value in data
]
all_buttons = [buttons[i : i + 2] for i in range(0, len(buttons), 2)]
send_buttons = all_buttons[(page - 1) * 7 : page * 7]
last_page = page - 1 if page > 1 else 0
all_page = math.ceil(len(all_buttons) / 7)
next_page = page + 1 if page < all_page and all_page > 1 else 0
last_button = []
if last_page:
last_button.append(
InlineKeyboardButton(
"<< 上一页",
callback_data=f"get_challenge_boss_history|{user_id}|{uid}|p_{last_page}",
)
)
if last_page or next_page:
last_button.append(
InlineKeyboardButton(
f"{page}/{all_page}",
callback_data=f"get_challenge_boss_history|{user_id}|{uid}|empty_data",
)
)
if next_page:
last_button.append(
InlineKeyboardButton(
"下一页 >>",
callback_data=f"get_challenge_boss_history|{user_id}|{uid}|p_{next_page}",
)
)
if last_button:
send_buttons.append(last_button)
return send_buttons
@staticmethod
async def gen_floor_button(
data_id: int,
abyss_data: "HistoryDataChallengeBoss",
user_id: int,
uid: int,
) -> List[List[InlineKeyboardButton]]:
max_floors = len(abyss_data.boss_data.floors)
buttons = [
InlineKeyboardButton(
f"{i + 1}",
callback_data=f"get_challenge_boss_history|{user_id}|{uid}|{data_id}|{i + 1}",
)
for i in range(0, max_floors)
if not abyss_data.boss_data.floors[max_floors - 1 - i].is_fast
]
send_buttons = [buttons[i : i + 4] for i in range(0, len(buttons), 4)]
all_buttons = [
InlineKeyboardButton(
"<< 返回",
callback_data=f"get_challenge_boss_history|{user_id}|{uid}|p_1",
),
InlineKeyboardButton(
"总览",
callback_data=f"get_challenge_boss_history|{user_id}|{uid}|{data_id}|total",
),
InlineKeyboardButton(
"所有",
callback_data=f"get_challenge_boss_history|{user_id}|{uid}|{data_id}|all",
),
]
send_buttons.append(all_buttons)
return send_buttons
@handler.command("challenge_boss_history", block=False)
@handler.message(filters.Regex(r"^末日幻影历史数据"), block=False)
async def challenge_boss_history_command_start(self, update: Update, _: CallbackContext) -> None:
user_id = await self.get_real_user_id(update)
message = update.effective_message
uid, offset = self.get_real_uid_or_offset(update)
uid: int = await self.get_uid(user_id, message.reply_to_message, uid, offset)
self.log_user(update, logger.info, "查询末日幻影历史数据 uid[%s]", uid)
async with self.helper.genshin_or_public(user_id, uid=uid) as _:
await self.get_session_button_data(user_id, uid, force=True)
buttons = await self.gen_season_button(user_id, uid)
if not buttons:
await message.reply_text("还没有末日幻影历史数据哦~")
return
await message.reply_text("请选择要查询的末日幻影历史数据", reply_markup=InlineKeyboardMarkup(buttons))
async def get_challenge_boss_history_page(self, update: "Update", user_id: int, uid: int, result: str):
"""翻页处理"""
callback_query = update.callback_query
self.log_user(update, logger.info, "切换末日幻影历史数据页 page[%s]", result)
page = int(result.split("_")[1])
async with self.helper.genshin_or_public(user_id) as _:
buttons = await self.gen_season_button(user_id, uid, page)
if not buttons:
await callback_query.answer("还没有末日幻影历史数据哦~", show_alert=True)
await callback_query.edit_message_text("还没有末日幻影历史数据哦~")
return
await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer(f"已切换到第 {page}", show_alert=False)
async def get_challenge_boss_history_season(self, update: "Update", data_id: int):
"""进入选择层数"""
callback_query = update.callback_query
user = callback_query.from_user
self.log_user(update, logger.info, "切换末日幻影历史数据到层数页 data_id[%s]", data_id)
data = await self.history_data_abyss.get_by_id(data_id)
if not data:
await callback_query.answer("数据不存在,请尝试重新发送命令~", show_alert=True)
await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~")
return
abyss_data = HistoryDataChallengeBoss.from_data(data)
buttons = await self.gen_floor_button(data_id, abyss_data, user.id, data.user_id)
await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer("已切换到层数页", show_alert=False)
async def get_challenge_boss_history_floor(self, update: "Update", data_id: int, detail: str):
"""渲染层数数据"""
callback_query = update.callback_query
message = callback_query.message
reply = None
if message.reply_to_message:
reply = message.reply_to_message
floor = 0
total = False
if detail == "total":
floor = 0
elif detail == "all":
total = True
else:
floor = int(detail)
data = await self.history_data_abyss.get_by_id(data_id)
if not data:
await callback_query.answer("数据不存在,请尝试重新发送命令", show_alert=True)
await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~")
return
abyss_data = HistoryDataChallengeBoss.from_data(data)
images = await self.get_rendered_pic(abyss_data.boss_data, abyss_data.group, data.user_id, floor, total)
if images is None:
await callback_query.answer(f"还没有第 {floor} 层的挑战数据", show_alert=True)
return
await callback_query.answer("正在渲染图片中 请稍等 请不要重复点击按钮", show_alert=False)
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
for group in ArkoWrapper(images).group(10): # 每 10 张图片分一个组
await RenderGroupResult(results=group).reply_media_group(reply or message, write_timeout=60)
self.log_user(update, logger.info, "[bold]末日幻影挑战数据[/bold]: 成功发送图片", extra={"markup": True})
self.add_delete_message_job(message, delay=1)
@handler.callback_query(pattern=r"^get_challenge_boss_history\|", block=False)
async def get_challenge_boss_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
callback_query = update.callback_query
user = callback_query.from_user
async def get_challenge_boss_history_callback(
callback_query_data: str,
) -> Tuple[str, str, int, int]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_uid = int(_data[2])
_result = _data[3]
_detail = _data[4] if len(_data) > 4 else None
logger.debug(
"callback_query_data函数返回 detail[%s] result[%s] user_id[%s] uid[%s]",
_detail,
_result,
_user_id,
_uid,
)
return _detail, _result, _user_id, _uid
detail, result, user_id, uid = await get_challenge_boss_history_callback(callback_query.data)
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
if result == "empty_data":
await callback_query.answer(text="此按钮不可用", show_alert=True)
return
if result.startswith("p_"):
await self.get_challenge_boss_history_page(update, user_id, uid, result)
return
data_id = int(result)
if detail:
await self.get_challenge_boss_history_floor(update, data_id, detail)
return
await self.get_challenge_boss_history_season(update, data_id)
async def challenge_boss_use_by_inline(
self, update: "Update", context: "ContextTypes.DEFAULT_TYPE", previous: bool
):
callback_query = update.callback_query
user = update.effective_user
user_id = user.id
uid = IInlineUseData.get_uid_from_context(context)
self.log_user(update, logger.info, "查询末日幻影挑战总览数据 previous[%s]", previous)
notice = None
try:
async with self.helper.genshin_or_public(user_id, uid=uid) as client:
if not client.public:
await client.get_record_cards()
abyss_data, season = await self.get_rendered_pic_data(client, uid, previous)
images = await self.get_rendered_pic(abyss_data, season, uid, 0, False)
image = images[0]
except AbyssUnlocked: # 若深渊未解锁
notice = "还未解锁末日幻影哦~"
except TooManyRequestPublicCookies:
notice = "查询次数太多,请您稍后重试"
if notice:
await callback_query.answer(notice, show_alert=True)
return
await image.edit_inline_media(callback_query)
async def get_inline_use_data(self) -> List[Optional[IInlineUseData]]:
return [
IInlineUseData(
text="本期末日幻影总览",
hash="challenge_boss_current",
callback=partial(self.challenge_boss_use_by_inline, previous=False),
player=True,
),
IInlineUseData(
text="上期末日幻影总览",
hash="challenge_boss_previous",
callback=partial(self.challenge_boss_use_by_inline, previous=True),
player=True,
),
]

Binary file not shown.

After

Width:  |  Height:  |  Size: 275 KiB

View File

@ -0,0 +1,111 @@
<!DOCTYPE html>
<html lang="zh-cn">
<head>
<meta charset="UTF-8">
<title>floor</title>
<link type="text/css" href="./style.css" rel="stylesheet"/>
<link type="text/css" href="../../styles/public.css" rel="stylesheet"/>
<style>
body {
margin: 0;
padding: 0;
}
.floors, .floor {
border-radius: unset;
margin: 0;
}
.floor-num > div:last-child {
display: flex;
flex-flow: column;
justify-content: center;
align-content: center;
}
</style>
</head>
<body>
<div class="container2">
<div class="floors">
<div class="floor floor-abyss-boss">
<div class="head">
<div class="floor-name">
<div class="floor-num"></div>
<div>
<div>UID: {{ uid }}</div>
<div>{{ title }}·{{ floor.name }}</div>
</div>
</div>
<div class="star">
<div>
<div>{{ floor.star_num }}</div>
<div class="score">总分:{{ floor.score }}</div>
</div>
</div>
</div>
<div class="hr"></div>
<div class="chamber">
<div class="chamber-info">
<div>
<span style="color: #A3A3A3">{{ floor_time }}</span>
</div>
<div class="stars">
{% for n in range(floor.star_num) %}
<div class="star"></div>
{% endfor %}
</div>
</div>
<div class="battles">
{% for node in floor_nodes %}
<div class="battle">
{% for character in node.avatars %}
<div class="character">
{% if character.rank > 0 %}
{% set constellation = character.rank %}
{% set bg = ['blue', 'blue', 'green', 'green', 'red', 'red'][constellation - 1] %}
<div style="background-color: var(--{{ bg }})">
{{ constellation }} 命
</div>
{% endif %}
<div class="element" style="background-image: url('../../img/element/{{ character.element }}.png')"></div>
<div class="icon"
style="background-image: url('../../background/rarity/half/{{ character.rarity }}.png')">
<img src="{{ character.icon }}" alt=""/>
</div>
<div class="caption">Lv.{{ character.level }}</div>
</div>
{% endfor %}
{% if loop.length > 1 %}
<div class="battle-info">
<div>节点{{ loop.index }}</div>
<br/>
<div class="score">积分:{{ node.score }}</div>
<br/>
{% if node.boss_defeated %}
<div>已击败首领</div>
{% else %}
<div>未击败首领</div>
{% endif %}
</div>
{% endif %}
</div>
<div class="buffs">
<div class="buff-item">
<img src="{{ node.buff.icon }}" alt="buff-item" class="buff-item-icon">
<p>
<span class="buff-item-name">{{ node.buff.name_mi18n }}</span>
<span class="buff-item-desc">{{ node.buff.desc_mi18n }}</span>
</p>
</div>
</div>
{% if loop.index < loop.length %}
<div class="hr"></div>
{% endif %}
{% endfor %}
</div>
</div>
</div>
</div>
</div>
</body>
</html>

View File

@ -14,18 +14,34 @@
.overview { .overview {
border-radius: unset; border-radius: unset;
} }
{% if title == '末日幻影' %}
.boss-upper::before {
background-image: url("{{ upper_boss.icon }}");
}
.boss-lower::before {
background-image: url("{{ lower_boss.icon }}");
}
{% endif %}
</style> </style>
</head> </head>
<body> <body>
<div class="container"> <div class="container">
{% set summarize_class = '' %}
{% if title == '混沌回忆' %} {% if title == '混沌回忆' %}
{% set overview_class = 'overview-abyss' %} {% set overview_class = 'overview-abyss' %}
{% else %} {% endif %}
{% if title == '虚构叙事' %}
{% set overview_class = 'overview-abyss-story' %} {% set overview_class = 'overview-abyss-story' %}
{% endif %} {% endif %}
{% if title == '末日幻影' %}
{% set overview_class = 'overview-abyss-boss' %}
{% set summarize_class = 'summarize-boss' %}
{% endif %}
<div class="overview {{ overview_class }}"> <div class="overview {{ overview_class }}">
<div class="title">{{ title }}挑战回顾</div> <div class="title">{{ title }}挑战回顾</div>
<div class="summarize"> <div class="summarize {{ summarize_class }}">
<div> <div>
<div>UID: {{ uid }}</div> <div>UID: {{ uid }}</div>
<div class="star"> <div class="star">
@ -40,6 +56,16 @@
<div>本期结束: {{ end_time }}</div> <div>本期结束: {{ end_time }}</div>
<div>战斗次数: {{ total_battles }}</div> <div>战斗次数: {{ total_battles }}</div>
</div> </div>
{% if title == '末日幻影' %}
<div>
<div class="star boss-upper">
<span>{{ upper_boss.name_mi18n }}</span>
</div>
<div class="star boss-lower">
<span>{{ lower_boss.name_mi18n }}</span>
</div>
</div>
{% endif %}
</div> </div>
</div> </div>
</div> </div>

View File

@ -46,7 +46,7 @@ body {
/* 概览 */ /* 概览 */
.overview { .overview {
height: 250px; /*height: 250px;*/
padding: 20px 40px; padding: 20px 40px;
background-size: cover; background-size: cover;
background-repeat: no-repeat; background-repeat: no-repeat;
@ -64,6 +64,10 @@ body {
background-image: linear-gradient(to top, rgb(0 0 0 / 10%), rgb(0 0 0 / 10%)), url("./background/abyss-story-bg-grad.png"); background-image: linear-gradient(to top, rgb(0 0 0 / 10%), rgb(0 0 0 / 10%)), url("./background/abyss-story-bg-grad.png");
} }
.overview-abyss-boss {
background-image: linear-gradient(to top, rgb(0 0 0 / 10%), rgb(0 0 0 / 10%)), url("./background/abyss-boss-bg-grad.png");
}
.summarize { .summarize {
font-size: 20px; font-size: 20px;
width: calc(100% - 90px); width: calc(100% - 90px);
@ -81,6 +85,14 @@ body {
backdrop-filter: blur(5px); backdrop-filter: blur(5px);
} }
.summarize-boss {
height: 200px;
}
.summarize-boss > div {
height: 25%!important;
}
.summarize > div { .summarize > div {
width: 100%; width: 100%;
height: 33.3%; height: 33.3%;
@ -267,6 +279,12 @@ body {
background-color: #1c232c; background-color: #1c232c;
} }
.floor-abyss-boss {
background-image: url('./background/abyss-boss-bg-grad.png');
background-size: contain;
background-color: #1c232c;
}
.head { .head {
width: 100%; width: 100%;
height: 160px; height: 160px;