Support query challenge history

This commit is contained in:
omg-xtao 2024-04-27 19:46:18 +08:00 committed by GitHub
parent bf11cbed97
commit b605e511d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 733 additions and 30 deletions

View File

@ -0,0 +1,52 @@
"""history_data
Revision ID: 87c6195e5306
Revises: 369fb74daad9
Create Date: 2024-04-26 22:57:42.309397
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "87c6195e5306"
down_revision = "369fb74daad9"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"history_data",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("user_id", sa.BigInteger(), nullable=False),
sa.Column("data_id", sa.BigInteger(), nullable=True),
sa.Column(
"time_created",
sa.DateTime(),
server_default=sa.text("now()"),
nullable=True,
),
sa.Column("time_updated", sa.DateTime(), nullable=True),
sa.Column("type", sa.Integer(), nullable=False),
sa.Column("data", sa.JSON(), nullable=True),
sa.PrimaryKeyConstraint("id", "user_id", "type"),
mysql_charset="utf8mb4",
mysql_collate="utf8mb4_general_ci",
)
op.create_index(
op.f("ix_history_data_user_id"),
"history_data",
["user_id"],
unique=False,
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_history_data_user_id"), table_name="history_data")
op.drop_table("history_data")
# ### end Alembic commands ###

View File

View File

@ -0,0 +1,36 @@
import enum
from pydantic import BaseModel
from simnet.models.starrail.chronicle.challenge import StarRailChallenge
from simnet.models.starrail.chronicle.challenge_story import StarRailChallengeStory, StarRailChallengeStoryGroup
from gram_core.services.history_data.models import HistoryData
__all__ = (
"HistoryData",
"HistoryDataTypeEnum",
"HistoryDataAbyss",
"HistoryDataChallengeStory",
)
class HistoryDataTypeEnum(int, enum.Enum):
ABYSS = 0 # 混沌回忆
CHALLENGE_STORY = 1 # 虚构叙事
class HistoryDataAbyss(BaseModel):
abyss_data: StarRailChallenge
@classmethod
def from_data(cls, data: HistoryData) -> "HistoryDataAbyss":
return cls.parse_obj(data.data)
class HistoryDataChallengeStory(BaseModel):
story_data: StarRailChallengeStory
group: StarRailChallengeStoryGroup
@classmethod
def from_data(cls, data: HistoryData) -> "HistoryDataChallengeStory":
return cls.parse_obj(data.data)

View File

@ -0,0 +1,3 @@
from gram_core.services.history_data.repositories import HistoryDataRepository
__all__ = ("HistoryDataRepository",)

View File

@ -0,0 +1,77 @@
import datetime
from typing import List
from pytz import timezone
from simnet.models.starrail.chronicle.challenge import StarRailChallenge
from simnet.models.starrail.chronicle.challenge_story import StarRailChallengeStory, StarRailChallengeStoryGroup
from core.services.history_data.models import (
HistoryData,
HistoryDataTypeEnum,
HistoryDataAbyss,
HistoryDataChallengeStory,
)
from gram_core.base_service import BaseService
from gram_core.services.history_data.services import HistoryDataBaseServices
try:
import ujson as jsonlib
except ImportError:
import json as jsonlib
__all__ = (
"HistoryDataBaseServices",
"HistoryDataAbyssServices",
"HistoryDataChallengeStoryServices",
)
TZ = timezone("Asia/Shanghai")
def json_encoder(value):
if isinstance(value, datetime.datetime):
return value.astimezone(TZ).strftime("%Y-%m-%d %H:%M:%S")
return value
class HistoryDataAbyssServices(BaseService, HistoryDataBaseServices):
DATA_TYPE = HistoryDataTypeEnum.ABYSS.value
@staticmethod
def exists_data(data: HistoryData, old_data: List[HistoryData]) -> bool:
return any(d.data == data.data for d in old_data)
@staticmethod
def create(user_id: int, abyss_data: StarRailChallenge):
data = HistoryDataAbyss(abyss_data=abyss_data)
json_data = data.json(by_alias=True, encoder=json_encoder)
return HistoryData(
user_id=user_id,
data_id=abyss_data.season,
time_created=datetime.datetime.now(),
type=HistoryDataAbyssServices.DATA_TYPE,
data=jsonlib.loads(json_data),
)
class HistoryDataChallengeStoryServices(BaseService, HistoryDataBaseServices):
DATA_TYPE = HistoryDataTypeEnum.CHALLENGE_STORY.value
@staticmethod
def exists_data(data: HistoryData, old_data: List[HistoryData]) -> bool:
return any(d.data == data.data for d in old_data)
@staticmethod
def create(user_id: int, story_data: StarRailChallengeStory, group: StarRailChallengeStoryGroup):
data = HistoryDataChallengeStory(story_data=story_data, group=group)
json_data = data.json(by_alias=True, encoder=json_encoder)
dict_data = jsonlib.loads(json_data)
dict_data["story_data"]["groups"] = []
return HistoryData(
user_id=user_id,
data_id=group.season,
time_created=datetime.datetime.now(),
type=HistoryDataChallengeStoryServices.DATA_TYPE,
data=dict_data,
)

@ -1 +1 @@
Subproject commit 14bc3c5a191e13873cae57a5fe36d35641d51491
Subproject commit 481770502884afbce7d6b38f0448fec9e749885e

View File

@ -44,18 +44,11 @@ class SetCommandPlugin(Plugin):
BotCommand("strategy", "角色攻略查询"),
BotCommand("material", "角色培养素材查询"),
BotCommand("challenge", "混沌回忆信息查询"),
BotCommand("challenge_history", "混沌回忆历史信息查询"),
BotCommand("challenge_story", "虚构叙事信息查询"),
BotCommand("challenge_story_history", "虚构叙事历史信息查询"),
BotCommand("rogue", "模拟宇宙信息查询"),
BotCommand("rogue_locust", "寰宇蝗灾信息查询"),
BotCommand("museum", "冬城博物珍奇簿信息查询"),
BotCommand("fantastic_story", "评书奇谭信息查询"),
BotCommand("treasure_dungeon", "地城探宝信息查询"),
BotCommand("copper_man", "金人巷信息查询"),
BotCommand("yitai_battle", "以太战线信息查询"),
BotCommand("endless_side", "无尽位面信息查询"),
BotCommand("fox_story", "狐斋志异信息查询"),
BotCommand("boxing_show", "斗技表演赛信息查询"),
BotCommand("space_zoo", "异宠拾遗信息查询"),
BotCommand("cookies_import", "从其他 BOT 导入账号信息"),
BotCommand("cookies_export", "导出账号信息给其他 BOT"),
]

View File

@ -1,22 +1,28 @@
"""混沌回忆数据查询"""
import asyncio
import math
import re
from functools import lru_cache
from typing import Any, List, Optional, Tuple, Union, TYPE_CHECKING
from arkowrapper import ArkoWrapper
from pytz import timezone
from telegram import Message, Update
from telegram import Message, Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction, ParseMode
from telegram.ext import CallbackContext, filters
from telegram.ext import CallbackContext, filters, ContextTypes
from core.dependence.assets import AssetsService
from core.plugin import Plugin, handler
from core.services.cookies.error import TooManyRequestPublicCookies
from core.services.history_data.models import HistoryDataAbyss
from core.services.history_data.services import HistoryDataAbyssServices
from core.services.template.models import RenderGroupResult, RenderResult
from core.services.template.services import TemplateService
from gram_core.config import config
from gram_core.dependence.redisdb import RedisDB
from plugins.tools.genshin import GenshinHelper
from utils.enkanetwork import RedisCache
from utils.log import logger
from utils.uid import mask_number
@ -68,10 +74,14 @@ class ChallengePlugin(Plugin):
template: TemplateService,
helper: GenshinHelper,
assets_service: AssetsService,
history_data_abyss: HistoryDataAbyssServices,
redis: RedisDB,
):
self.template_service = template
self.helper = helper
self.assets_service = assets_service
self.history_data_abyss = history_data_abyss
self.cache = RedisCache(redis.client, key="plugin:challenge:history")
async def get_uid(self, user_id: int, args: List[str], reply: Optional[Message]) -> int:
"""通过消息获取 uid优先级args > reply > self"""
@ -149,7 +159,8 @@ class ChallengePlugin(Plugin):
if total:
reply_text = await message.reply_text("彦卿需要时间整理混沌回忆数据,还请耐心等待哦~")
await message.reply_chat_action(ChatAction.TYPING)
images = await self.get_rendered_pic(client, uid, floor, total, previous)
abyss_data = await self.get_rendered_pic_data(client, uid, previous)
images = await self.get_rendered_pic(abyss_data, uid, floor, total)
except TooManyRequestPublicCookies:
reply_message = await message.reply_text("查询次数太多,请您稍后重试")
if filters.ChatType.GROUPS.filter(message):
@ -199,8 +210,14 @@ class ChallengePlugin(Plugin):
}
return render_data
async def get_rendered_pic(
self, client: "StarRailClient", uid: int, floor: int, total: bool, previous: bool
async def get_rendered_pic_data(self, client: "StarRailClient", uid: int, previous: bool) -> "StarRailChallenge":
abyss_data = await client.get_starrail_challenge(uid, previous=previous, lang="zh-cn")
if abyss_data.has_data:
await self.save_abyss_data(uid, abyss_data)
return abyss_data
async def get_rendered_pic( # skipcq: PY-R1000 #
self, abyss_data: "StarRailChallenge", uid: int, floor: int, total: bool
) -> Union[
Tuple[
Union[BaseException, Any],
@ -216,17 +233,15 @@ class ChallengePlugin(Plugin):
获取渲染后的图片
Args:
client (Client): 获取 genshin 数据的 client
abyss_data (StarRailChallenge): 混沌回忆数据
uid (int): 需要查询的 uid
floor (int): 层数
total (bool): 是否为总览
previous (bool): 是否为上期
Returns:
bytes格式的图片
"""
abyss_data = await client.get_starrail_challenge(uid, previous=previous, lang="zh-cn")
if not abyss_data.has_data:
raise AbyssUnlocked()
start_time = abyss_data.begin_time.datetime.astimezone(TZ).strftime("%m月%d%H:%M")
@ -309,3 +324,252 @@ class ChallengePlugin(Plugin):
"starrail/abyss/floor.html", render_data, viewport={"width": 690, "height": 500}
)
]
async def save_abyss_data(self, uid: int, abyss_data: "StarRailChallenge"):
model = self.history_data_abyss.create(uid, abyss_data)
old_data = await self.history_data_abyss.get_by_user_id_data_id(uid, model.data_id)
exists = self.history_data_abyss.exists_data(model, old_data)
if not exists:
await self.history_data_abyss.add(model)
async def get_abyss_data(self, uid: int):
return await self.history_data_abyss.get_by_user_id(uid)
@staticmethod
def get_season_data_name(data: "HistoryDataAbyss"):
start_time = data.abyss_data.begin_time.datetime.astimezone(TZ)
time = start_time.strftime("%Y.%m.%d")
honor = ""
if data.abyss_data.total_stars == 36:
if data.abyss_data.total_battles == 12:
honor = "👑"
last_battles = data.abyss_data.floors[-1]
num_of_characters = max(
len(last_battles.node_1.avatars),
len(last_battles.node_2.avatars),
)
if num_of_characters == 2:
honor = "双通"
elif num_of_characters == 1:
honor = "单通"
return f"{time} {data.abyss_data.total_stars}{honor}"
async def get_session_button_data(self, user_id: int, uid: int, force: bool = False):
redis = await self.cache.get(str(uid))
if redis and not force:
return redis["buttons"]
data = await self.get_abyss_data(uid)
data.sort(key=lambda x: x.id, reverse=True)
abyss_data = [HistoryDataAbyss.from_data(i) for i in data]
buttons = [
{
"name": self.get_season_data_name(abyss_data[idx]),
"value": f"get_abyss_history|{user_id}|{uid}|{value.id}",
}
for idx, value in enumerate(data)
]
await self.cache.set(str(uid), {"buttons": buttons})
return buttons
async def gen_season_button(
self,
user_id: int,
uid: int,
page: int = 1,
) -> List[List[InlineKeyboardButton]]:
"""生成按钮"""
data = await self.get_session_button_data(user_id, uid)
if not data:
return []
buttons = [
InlineKeyboardButton(
value["name"],
callback_data=value["value"],
)
for value in data
]
all_buttons = [buttons[i : i + 3] for i in range(0, len(buttons), 3)]
send_buttons = all_buttons[(page - 1) * 5 : page * 5]
last_page = page - 1 if page > 1 else 0
all_page = math.ceil(len(all_buttons) / 5)
next_page = page + 1 if page < all_page and all_page > 1 else 0
last_button = []
if last_page:
last_button.append(
InlineKeyboardButton(
"<< 上一页",
callback_data=f"get_abyss_history|{user_id}|{uid}|p_{last_page}",
)
)
if last_page or next_page:
last_button.append(
InlineKeyboardButton(
f"{page}/{all_page}",
callback_data=f"get_abyss_history|{user_id}|{uid}|empty_data",
)
)
if next_page:
last_button.append(
InlineKeyboardButton(
"下一页 >>",
callback_data=f"get_abyss_history|{user_id}|{uid}|p_{next_page}",
)
)
if last_button:
send_buttons.append(last_button)
return send_buttons
@staticmethod
async def gen_floor_button(
data_id: int,
abyss_data: "HistoryDataAbyss",
user_id: int,
uid: int,
) -> List[List[InlineKeyboardButton]]:
max_floors = len(abyss_data.abyss_data.floors)
bypass_floors = len([i for i in abyss_data.abyss_data.floors if i.is_fast])
buttons = [
InlineKeyboardButton(
f"{i}",
callback_data=f"get_abyss_history|{user_id}|{uid}|{data_id}|{i}",
)
for i in range(bypass_floors + 1, max_floors + 1)
]
send_buttons = [buttons[i : i + 4] for i in range(0, len(buttons), 4)]
all_buttons = [
InlineKeyboardButton(
"<< 返回",
callback_data=f"get_abyss_history|{user_id}|{uid}|p_1",
),
InlineKeyboardButton(
"总览",
callback_data=f"get_abyss_history|{user_id}|{uid}|{data_id}|total",
),
InlineKeyboardButton(
"所有",
callback_data=f"get_abyss_history|{user_id}|{uid}|{data_id}|all",
),
]
send_buttons.append(all_buttons)
return send_buttons
@handler.command("challenge_history", block=False)
@handler.message(filters.Regex(r"^混沌回忆历史数据"), block=False)
async def abyss_history_command_start(self, update: Update, context: CallbackContext) -> None:
user_id = await self.get_real_user_id(update)
message = update.effective_message
uid: int = await self.get_uid(user_id, context.args, message.reply_to_message)
self.log_user(update, logger.info, "查询混沌回忆历史数据 uid[%s]", uid)
async with self.helper.genshin_or_public(user_id, uid=uid) as _:
await self.get_session_button_data(user_id, uid, force=True)
buttons = await self.gen_season_button(user_id, uid)
if not buttons:
await message.reply_text("还没有混沌回忆历史数据哦~")
return
await message.reply_text("请选择要查询的混沌回忆历史数据", reply_markup=InlineKeyboardMarkup(buttons))
async def get_abyss_history_page(self, update: "Update", user_id: int, uid: int, result: str):
"""翻页处理"""
callback_query = update.callback_query
self.log_user(update, logger.info, "切换混沌回忆历史数据页 page[%s]", result)
page = int(result.split("_")[1])
async with self.helper.genshin_or_public(user_id) as _:
buttons = await self.gen_season_button(user_id, uid, page)
if not buttons:
await callback_query.answer("还没有混沌回忆历史数据哦~", show_alert=True)
await callback_query.edit_message_text("还没有混沌回忆历史数据哦~")
return
await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer(f"已切换到第 {page}", show_alert=False)
async def get_abyss_history_season(self, update: "Update", data_id: int):
"""进入选择层数"""
callback_query = update.callback_query
user = callback_query.from_user
self.log_user(update, logger.info, "切换混沌回忆历史数据到层数页 data_id[%s]", data_id)
data = await self.history_data_abyss.get_by_id(data_id)
if not data:
await callback_query.answer("数据不存在,请尝试重新发送命令~", show_alert=True)
await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~")
return
abyss_data = HistoryDataAbyss.from_data(data)
buttons = await self.gen_floor_button(data_id, abyss_data, user.id, data.user_id)
await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer("已切换到层数页", show_alert=False)
async def get_abyss_history_floor(self, update: "Update", data_id: int, detail: str):
"""渲染层数数据"""
callback_query = update.callback_query
message = callback_query.message
floor = 0
total = False
if detail == "total":
floor = 0
elif detail == "all":
total = True
else:
floor = int(detail)
data = await self.history_data_abyss.get_by_id(data_id)
if not data:
await callback_query.answer("数据不存在,请尝试重新发送命令", show_alert=True)
await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~")
return
abyss_data = HistoryDataAbyss.from_data(data)
images = await self.get_rendered_pic(abyss_data.abyss_data, data.user_id, floor, total)
if images is None:
await callback_query.answer(f"还没有第 {floor} 层的挑战数据", show_alert=True)
return
await callback_query.answer("正在渲染图片中 请稍等 请不要重复点击按钮", show_alert=False)
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
for group in ArkoWrapper(images).group(10): # 每 10 张图片分一个组
await RenderGroupResult(results=group).reply_media_group(
message, allow_sending_without_reply=True, write_timeout=60
)
self.log_user(update, logger.info, "[bold]混沌回忆挑战数据[/bold]: 成功发送图片", extra={"markup": True})
self.add_delete_message_job(message, delay=1)
@handler.callback_query(pattern=r"^get_abyss_history\|", block=False)
async def get_abyss_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
callback_query = update.callback_query
user = callback_query.from_user
async def get_abyss_history_callback(
callback_query_data: str,
) -> Tuple[str, str, int, int]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_uid = int(_data[2])
_result = _data[3]
_detail = _data[4] if len(_data) > 4 else None
logger.debug(
"callback_query_data函数返回 detail[%s] result[%s] user_id[%s] uid[%s]",
_detail,
_result,
_user_id,
_uid,
)
return _detail, _result, _user_id, _uid
detail, result, user_id, uid = await get_abyss_history_callback(callback_query.data)
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
if result == "empty_data":
await callback_query.answer(text="此按钮不可用", show_alert=True)
return
if result.startswith("p_"):
await self.get_abyss_history_page(update, user_id, uid, result)
return
data_id = int(result)
if detail:
await self.get_abyss_history_floor(update, data_id, detail)
return
await self.get_abyss_history_season(update, data_id)

View File

@ -1,22 +1,28 @@
"""虚构叙事数据查询"""
import asyncio
import math
import re
from functools import lru_cache
from typing import Any, List, Optional, Tuple, Union, TYPE_CHECKING
from arkowrapper import ArkoWrapper
from pytz import timezone
from telegram import Message, Update
from telegram import Message, Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction, ParseMode
from telegram.ext import CallbackContext, filters
from telegram.ext import CallbackContext, filters, ContextTypes
from core.dependence.assets import AssetsService
from core.plugin import Plugin, handler
from core.services.cookies.error import TooManyRequestPublicCookies
from core.services.history_data.models import HistoryDataChallengeStory
from core.services.history_data.services import HistoryDataChallengeStoryServices
from core.services.template.models import RenderGroupResult, RenderResult
from core.services.template.services import TemplateService
from gram_core.config import config
from gram_core.dependence.redisdb import RedisDB
from plugins.tools.genshin import GenshinHelper
from utils.enkanetwork import RedisCache
from utils.log import logger
from utils.uid import mask_number
@ -28,8 +34,7 @@ except ImportError:
if TYPE_CHECKING:
from simnet import StarRailClient
from simnet.models.starrail.chronicle.challenge_story import StarRailChallengeStory
from simnet.models.starrail.chronicle.challenge_story import StarRailChallengeStory, StarRailChallengeStoryGroup
TZ = timezone("Asia/Shanghai")
cmd_pattern = r"(?i)^/challenge_story(?:@[\w]+)?\s*((?:\d+)|(?:all))?\s*(pre)?"
@ -68,10 +73,14 @@ class ChallengeStoryPlugin(Plugin):
template: TemplateService,
helper: GenshinHelper,
assets_service: AssetsService,
history_data_abyss: HistoryDataChallengeStoryServices,
redis: RedisDB,
):
self.template_service = template
self.helper = helper
self.assets_service = assets_service
self.history_data_abyss = history_data_abyss
self.cache = RedisCache(redis.client, key="plugin:challenge_story:history")
async def get_uid(self, user_id: int, args: List[str], reply: Optional[Message]) -> int:
"""通过消息获取 uid优先级args > reply > self"""
@ -149,7 +158,8 @@ class ChallengeStoryPlugin(Plugin):
if total:
reply_text = await message.reply_text("彦卿需要时间整理虚构叙事数据,还请耐心等待哦~")
await message.reply_chat_action(ChatAction.TYPING)
images = await self.get_rendered_pic(client, uid, floor, total, previous)
abyss_data, season = await self.get_rendered_pic_data(client, uid, previous)
images = await self.get_rendered_pic(abyss_data, season, uid, floor, total)
except TooManyRequestPublicCookies:
reply_message = await message.reply_text("查询次数太多,请您稍后重试")
if filters.ChatType.GROUPS.filter(message):
@ -199,8 +209,23 @@ class ChallengeStoryPlugin(Plugin):
}
return render_data
async def get_rendered_pic_data(
self, client: "StarRailClient", uid: int, previous: bool
) -> Tuple["StarRailChallengeStory", "StarRailChallengeStoryGroup"]:
abyss_data = await client.get_starrail_challenge_story(uid, previous=previous, lang="zh-cn")
group = None
if abyss_data.has_data and abyss_data.groups:
group = abyss_data.groups[1] if previous else abyss_data.groups[0]
await self.save_abyss_data(uid, abyss_data, group)
return abyss_data, group
async def get_rendered_pic(
self, client: "StarRailClient", uid: int, floor: int, total: bool, previous: bool
self,
abyss_data: "StarRailChallengeStory",
season: "StarRailChallengeStoryGroup",
uid: int,
floor: int,
total: bool,
) -> Union[
Tuple[
Union[BaseException, Any],
@ -216,22 +241,20 @@ class ChallengeStoryPlugin(Plugin):
获取渲染后的图片
Args:
client (Client): 获取 genshin 数据的 client
abyss_data (StarRailChallengeStory): 虚构叙事数据
season (StarRailChallengeStoryGroup): 虚构叙事组数据
uid (int): 需要查询的 uid
floor (int): 层数
total (bool): 是否为总览
previous (bool): 是否为上期
Returns:
bytes格式的图片
"""
abyss_data = await client.get_starrail_challenge_story(uid, previous=previous, lang="zh-cn")
if not abyss_data.has_data:
raise AbyssUnlocked()
if not abyss_data.groups:
if not season:
raise AbyssUnlocked()
season = abyss_data.groups[0]
start_time = season.begin_time.datetime.astimezone(TZ).strftime("%m月%d%H:%M")
end_time = season.end_time.datetime.astimezone(TZ).strftime("%m月%d%H:%M")
total_stars = f"{abyss_data.total_stars}"
@ -260,6 +283,9 @@ class ChallengeStoryPlugin(Plugin):
},
}
if total:
overview = await self.template_service.render(
"starrail/abyss/overview.html", render_data, viewport={"width": 750, "height": 250}
)
def floor_task(floor_index: int):
_abyss_data = self.get_floor_data(abyss_data, floor_index)
@ -287,7 +313,9 @@ class ChallengeStoryPlugin(Plugin):
render_group_inputs = list(map(lambda x: x[1], sorted(render_inputs, key=lambda x: x[0])))
return await asyncio.gather(*render_group_inputs)
render_group_outputs = await asyncio.gather(*render_group_inputs)
render_group_outputs.insert(0, overview)
return render_group_outputs
if floor < 1:
return [
@ -309,3 +337,253 @@ class ChallengeStoryPlugin(Plugin):
"starrail/abyss/floor_story.html", render_data, viewport={"width": 690, "height": 500}
)
]
async def save_abyss_data(
self, uid: int, abyss_data: "StarRailChallengeStory", group: "StarRailChallengeStoryGroup"
):
model = self.history_data_abyss.create(uid, abyss_data, group)
old_data = await self.history_data_abyss.get_by_user_id_data_id(uid, model.data_id)
exists = self.history_data_abyss.exists_data(model, old_data)
if not exists:
await self.history_data_abyss.add(model)
async def get_abyss_data(self, uid: int):
return await self.history_data_abyss.get_by_user_id(uid)
@staticmethod
def get_season_data_name(data: "HistoryDataChallengeStory"):
start_time = data.group.begin_time.datetime.astimezone(TZ)
time = start_time.strftime("%Y.%m.%d")
honor = ""
if data.story_data.total_stars == 12:
if data.story_data.total_battles == 4:
honor = "👑"
last_battles = data.story_data.floors[-1]
num_of_characters = max(
len(last_battles.node_1.avatars),
len(last_battles.node_2.avatars),
)
if num_of_characters == 2:
honor = "双通"
elif num_of_characters == 1:
honor = "单通"
return f"{time} {data.story_data.total_stars}{honor}"
async def get_session_button_data(self, user_id: int, uid: int, force: bool = False):
redis = await self.cache.get(str(uid))
if redis and not force:
return redis["buttons"]
data = await self.get_abyss_data(uid)
data.sort(key=lambda x: x.id, reverse=True)
abyss_data = [HistoryDataChallengeStory.from_data(i) for i in data]
buttons = [
{
"name": self.get_season_data_name(abyss_data[idx]),
"value": f"get_challenge_story_history|{user_id}|{uid}|{value.id}",
}
for idx, value in enumerate(data)
]
await self.cache.set(str(uid), {"buttons": buttons})
return buttons
async def gen_season_button(
self,
user_id: int,
uid: int,
page: int = 1,
) -> List[List[InlineKeyboardButton]]:
"""生成按钮"""
data = await self.get_session_button_data(user_id, uid)
if not data:
return []
buttons = [
InlineKeyboardButton(
value["name"],
callback_data=value["value"],
)
for value in data
]
all_buttons = [buttons[i : i + 3] for i in range(0, len(buttons), 3)]
send_buttons = all_buttons[(page - 1) * 5 : page * 5]
last_page = page - 1 if page > 1 else 0
all_page = math.ceil(len(all_buttons) / 5)
next_page = page + 1 if page < all_page and all_page > 1 else 0
last_button = []
if last_page:
last_button.append(
InlineKeyboardButton(
"<< 上一页",
callback_data=f"get_challenge_story_history|{user_id}|{uid}|p_{last_page}",
)
)
if last_page or next_page:
last_button.append(
InlineKeyboardButton(
f"{page}/{all_page}",
callback_data=f"get_challenge_story_history|{user_id}|{uid}|empty_data",
)
)
if next_page:
last_button.append(
InlineKeyboardButton(
"下一页 >>",
callback_data=f"get_challenge_story_history|{user_id}|{uid}|p_{next_page}",
)
)
if last_button:
send_buttons.append(last_button)
return send_buttons
@staticmethod
async def gen_floor_button(
data_id: int,
abyss_data: "HistoryDataChallengeStory",
user_id: int,
uid: int,
) -> List[List[InlineKeyboardButton]]:
max_floors = len(abyss_data.story_data.floors)
buttons = [
InlineKeyboardButton(
f"{i}",
callback_data=f"get_challenge_story_history|{user_id}|{uid}|{data_id}|{i}",
)
for i in range(1, max_floors + 1)
]
send_buttons = [buttons[i : i + 4] for i in range(0, len(buttons), 4)]
all_buttons = [
InlineKeyboardButton(
"<< 返回",
callback_data=f"get_challenge_story_history|{user_id}|{uid}|p_1",
),
InlineKeyboardButton(
"总览",
callback_data=f"get_challenge_story_history|{user_id}|{uid}|{data_id}|total",
),
InlineKeyboardButton(
"所有",
callback_data=f"get_challenge_story_history|{user_id}|{uid}|{data_id}|all",
),
]
send_buttons.append(all_buttons)
return send_buttons
@handler.command("challenge_story_history", block=False)
@handler.message(filters.Regex(r"^虚构叙事历史数据"), block=False)
async def challenge_story_history_command_start(self, update: Update, context: CallbackContext) -> None:
user_id = await self.get_real_user_id(update)
message = update.effective_message
uid: int = await self.get_uid(user_id, context.args, message.reply_to_message)
self.log_user(update, logger.info, "查询虚构叙事历史数据 uid[%s]", uid)
async with self.helper.genshin_or_public(user_id, uid=uid) as _:
await self.get_session_button_data(user_id, uid, force=True)
buttons = await self.gen_season_button(user_id, uid)
if not buttons:
await message.reply_text("还没有虚构叙事历史数据哦~")
return
await message.reply_text("请选择要查询的虚构叙事历史数据", reply_markup=InlineKeyboardMarkup(buttons))
async def get_challenge_story_history_page(self, update: "Update", user_id: int, uid: int, result: str):
"""翻页处理"""
callback_query = update.callback_query
self.log_user(update, logger.info, "切换虚构叙事历史数据页 page[%s]", result)
page = int(result.split("_")[1])
async with self.helper.genshin_or_public(user_id) as _:
buttons = await self.gen_season_button(user_id, uid, page)
if not buttons:
await callback_query.answer("还没有虚构叙事历史数据哦~", show_alert=True)
await callback_query.edit_message_text("还没有虚构叙事历史数据哦~")
return
await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer(f"已切换到第 {page}", show_alert=False)
async def get_challenge_story_history_season(self, update: "Update", data_id: int):
"""进入选择层数"""
callback_query = update.callback_query
user = callback_query.from_user
self.log_user(update, logger.info, "切换虚构叙事历史数据到层数页 data_id[%s]", data_id)
data = await self.history_data_abyss.get_by_id(data_id)
if not data:
await callback_query.answer("数据不存在,请尝试重新发送命令~", show_alert=True)
await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~")
return
abyss_data = HistoryDataChallengeStory.from_data(data)
buttons = await self.gen_floor_button(data_id, abyss_data, user.id, data.user_id)
await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer("已切换到层数页", show_alert=False)
async def get_challenge_story_history_floor(self, update: "Update", data_id: int, detail: str):
"""渲染层数数据"""
callback_query = update.callback_query
message = callback_query.message
floor = 0
total = False
if detail == "total":
floor = 0
elif detail == "all":
total = True
else:
floor = int(detail)
data = await self.history_data_abyss.get_by_id(data_id)
if not data:
await callback_query.answer("数据不存在,请尝试重新发送命令", show_alert=True)
await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~")
return
abyss_data = HistoryDataChallengeStory.from_data(data)
images = await self.get_rendered_pic(abyss_data.story_data, abyss_data.group, data.user_id, floor, total)
if images is None:
await callback_query.answer(f"还没有第 {floor} 层的挑战数据", show_alert=True)
return
await callback_query.answer("正在渲染图片中 请稍等 请不要重复点击按钮", show_alert=False)
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
for group in ArkoWrapper(images).group(10): # 每 10 张图片分一个组
await RenderGroupResult(results=group).reply_media_group(
message, allow_sending_without_reply=True, write_timeout=60
)
self.log_user(update, logger.info, "[bold]虚构叙事挑战数据[/bold]: 成功发送图片", extra={"markup": True})
self.add_delete_message_job(message, delay=1)
@handler.callback_query(pattern=r"^get_challenge_story_history\|", block=False)
async def get_challenge_story_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
callback_query = update.callback_query
user = callback_query.from_user
async def get_challenge_story_history_callback(
callback_query_data: str,
) -> Tuple[str, str, int, int]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_uid = int(_data[2])
_result = _data[3]
_detail = _data[4] if len(_data) > 4 else None
logger.debug(
"callback_query_data函数返回 detail[%s] result[%s] user_id[%s] uid[%s]",
_detail,
_result,
_user_id,
_uid,
)
return _detail, _result, _user_id, _uid
detail, result, user_id, uid = await get_challenge_story_history_callback(callback_query.data)
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
if result == "empty_data":
await callback_query.answer(text="此按钮不可用", show_alert=True)
return
if result.startswith("p_"):
await self.get_challenge_story_history_page(update, user_id, uid, result)
return
data_id = int(result)
if detail:
await self.get_challenge_story_history_floor(update, data_id, detail)
return
await self.get_challenge_story_history_season(update, data_id)