Support query abyss history

This commit is contained in:
omg-xtao 2024-04-27 18:13:54 +08:00 committed by GitHub
parent 1b36129dea
commit fbb7f028f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 415 additions and 15 deletions

View File

@ -0,0 +1,52 @@
"""history_data
Revision ID: 87c6195e5306
Revises: 369fb74daad9
Create Date: 2024-04-26 22:57:42.309397
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "87c6195e5306"
down_revision = "369fb74daad9"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"history_data",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("user_id", sa.BigInteger(), nullable=False),
sa.Column("data_id", sa.BigInteger(), nullable=True),
sa.Column(
"time_created",
sa.DateTime(),
server_default=sa.text("now()"),
nullable=True,
),
sa.Column("time_updated", sa.DateTime(), nullable=True),
sa.Column("type", sa.Integer(), nullable=False),
sa.Column("data", sa.JSON(), nullable=True),
sa.PrimaryKeyConstraint("id", "user_id", "type"),
mysql_charset="utf8mb4",
mysql_collate="utf8mb4_general_ci",
)
op.create_index(
op.f("ix_history_data_user_id"),
"history_data",
["user_id"],
unique=False,
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_history_data_user_id"), table_name="history_data")
op.drop_table("history_data")
# ### end Alembic commands ###

View File

View File

@ -0,0 +1,26 @@
import enum
from typing import Dict
from pydantic import BaseModel
from simnet.models.genshin.chronicle.abyss import SpiralAbyss
from gram_core.services.history_data.models import HistoryData
__all__ = (
"HistoryData",
"HistoryDataTypeEnum",
"HistoryDataAbyss",
)
class HistoryDataTypeEnum(int, enum.Enum):
ABYSS = 0 # 深境螺旋
class HistoryDataAbyss(BaseModel):
abyss_data: SpiralAbyss
character_data: Dict[int, int]
@classmethod
def from_data(cls, data: HistoryData):
return cls.parse_obj(data.data)

View File

@ -0,0 +1,3 @@
from gram_core.services.history_data.repositories import HistoryDataRepository
__all__ = ("HistoryDataRepository",)

View File

@ -0,0 +1,48 @@
import datetime
from typing import Dict, List
from pytz import timezone
from simnet.models.genshin.chronicle.abyss import SpiralAbyss
from core.services.history_data.models import HistoryData, HistoryDataTypeEnum, HistoryDataAbyss
from gram_core.base_service import BaseService
from gram_core.services.history_data.services import HistoryDataBaseServices
try:
import ujson as jsonlib
except ImportError:
import json as jsonlib
__all__ = (
"HistoryDataBaseServices",
"HistoryDataAbyssServices",
)
TZ = timezone("Asia/Shanghai")
def json_encoder(value):
if isinstance(value, datetime.datetime):
return value.astimezone(TZ).strftime("%Y-%m-%d %H:%M:%S")
return value
class HistoryDataAbyssServices(BaseService, HistoryDataBaseServices):
DATA_TYPE = HistoryDataTypeEnum.ABYSS.value
@staticmethod
def exists_data(data: HistoryData, old_data: List[HistoryData]) -> bool:
return any(d.data == data.data for d in old_data)
@staticmethod
def create(user_id: int, abyss_data: SpiralAbyss, character_data: Dict[int, int]):
data = HistoryDataAbyss(abyss_data=abyss_data, character_data=character_data)
json_data = data.json(by_alias=True, encoder=json_encoder)
return HistoryData(
user_id=user_id,
data_id=abyss_data.season,
time_created=datetime.datetime.now(),
type=HistoryDataAbyssServices.DATA_TYPE,
data=jsonlib.loads(json_data),
)

@ -1 +1 @@
Subproject commit 14bc3c5a191e13873cae57a5fe36d35641d51491
Subproject commit 481770502884afbce7d6b38f0448fec9e749885e

View File

@ -64,6 +64,7 @@ class SetCommandPlugin(Plugin):
BotCommand("ledger", "查询当月旅行札记"),
BotCommand("abyss", "查询深渊战绩"),
BotCommand("abyss_team", "查询深渊推荐配队"),
BotCommand("abyss_history", "查询深渊历史战绩"),
BotCommand("avatars", "查询角色练度"),
BotCommand("reg_time", "账号注册时间"),
BotCommand("daily_material", "今日素材表"),

View File

@ -1,24 +1,31 @@
"""深渊数据查询"""
import asyncio
import math
import re
from datetime import datetime
from functools import lru_cache
from typing import Any, Coroutine, List, Optional, Tuple, Union
from typing import Any, Coroutine, List, Optional, Tuple, Union, Dict
from arkowrapper import ArkoWrapper
from pytz import timezone
from simnet import GenshinClient
from telegram import Message, Update
from simnet.models.genshin.chronicle.abyss import SpiralAbyss
from telegram import Message, Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction, ParseMode
from telegram.ext import CallbackContext, filters
from telegram.ext import CallbackContext, filters, ContextTypes
from core.dependence.assets import AssetsService
from core.plugin import Plugin, handler
from core.services.cookies.error import TooManyRequestPublicCookies
from core.services.history_data.models import HistoryDataAbyss
from core.services.history_data.services import HistoryDataAbyssServices
from core.services.template.models import RenderGroupResult, RenderResult
from core.services.template.services import TemplateService
from gram_core.config import config
from gram_core.dependence.redisdb import RedisDB
from plugins.tools.genshin import GenshinHelper
from utils.enkanetwork import RedisCache
from utils.log import logger
from utils.uid import mask_number
@ -71,10 +78,14 @@ class AbyssPlugin(Plugin):
template: TemplateService,
helper: GenshinHelper,
assets_service: AssetsService,
history_data_abyss: HistoryDataAbyssServices,
redis: RedisDB,
):
self.template_service = template
self.helper = helper
self.assets_service = assets_service
self.history_data_abyss = history_data_abyss
self.cache = RedisCache(redis.client, key="plugin:abyss:history")
@handler.command("abyss", block=False)
@handler.message(filters.Regex(r"^深渊数据"), block=False)
@ -128,7 +139,8 @@ class AbyssPlugin(Plugin):
async with self.helper.genshin_or_public(user_id) as client:
if not client.public:
await client.get_record_cards()
images = await self.get_rendered_pic(client, client.player_id, floor, total, previous)
abyss_data, avatar_data = await self.get_rendered_pic_data(client, client.player_id, previous)
images = await self.get_rendered_pic(abyss_data, avatar_data, client.player_id, floor, total)
except AbyssUnlocked: # 若深渊未解锁
await message.reply_text("还未解锁深渊哦~")
return
@ -164,18 +176,29 @@ class AbyssPlugin(Plugin):
self.log_user(update, logger.info, "[bold]深渊挑战数据[/bold]: 成功发送图片", extra={"markup": True})
async def get_rendered_pic(
self, client: GenshinClient, uid: int, floor: int, total: bool, previous: bool
async def get_rendered_pic_data(
self, client: GenshinClient, uid: int, previous: bool
) -> Tuple["SpiralAbyss", Dict[int, int]]:
abyss_data = await client.get_genshin_spiral_abyss(uid, previous=previous, lang="zh-cn")
avatar_data = {}
if not client.public: # noqa
avatars = await client.get_genshin_characters(uid, lang="zh-cn")
avatar_data = {i.id: i.constellation for i in avatars}
await self.save_abyss_data(uid, abyss_data, avatar_data)
return abyss_data, avatar_data
async def get_rendered_pic( # skipcq: PY-R1000 #
self, abyss_data: "SpiralAbyss", avatar_data: Dict[int, int], uid: int, floor: int, total: bool
) -> Union[Tuple[Any], List[RenderResult], None]:
"""
获取渲染后的图片
Args:
client (Client): 获取 genshin 数据的 client
abyss_data (SpiralAbyss): 深渊数据
avatar_data (Dict[int, int]): 角色数据
uid (int): 需要查询的 uid
floor (int): 层数
total (bool): 是否为总览
previous (bool): 是否为上期
Returns:
bytes格式的图片
@ -186,8 +209,6 @@ class AbyssPlugin(Plugin):
return value.astimezone(TZ).strftime("%Y-%m-%d %H:%M:%S")
return value
abyss_data = await client.get_genshin_spiral_abyss(uid, previous=previous, lang="zh-cn")
if not abyss_data.unlocked:
raise AbyssUnlocked
if not abyss_data.ranks.most_kills:
@ -222,9 +243,9 @@ class AbyssPlugin(Plugin):
11: "#252550",
12: "#1D2A4A",
}
if total:
avatars = await client.get_genshin_characters(uid, lang="zh-cn")
render_data["avatar_data"] = {i.id: i.constellation for i in avatars}
render_data["avatar_data"] = avatar_data
data = jsonlib.loads(result)
render_data["data"] = data
@ -288,8 +309,7 @@ class AbyssPlugin(Plugin):
floors = jsonlib.loads(result)["floors"]
if not (floor_data := list(filter(lambda x: x["floor"] == floor, floors))):
return None
avatars = await client.get_genshin_characters(uid, lang="zh-cn")
render_data["avatar_data"] = {i.id: i.constellation for i in avatars}
render_data["avatar_data"] = avatar_data
render_data["floor"] = floor_data[0]
render_data["total_stars"] = f"{floor_data[0]['stars']}/{floor_data[0]['max_stars']}"
return [
@ -297,3 +317,253 @@ class AbyssPlugin(Plugin):
"genshin/abyss/floor.jinja2", render_data, viewport={"width": 690, "height": 500}
)
]
async def save_abyss_data(self, uid: int, abyss_data: "SpiralAbyss", character_data: Dict[int, int]):
model = self.history_data_abyss.create(uid, abyss_data, character_data)
old_data = await self.history_data_abyss.get_by_user_id_data_id(uid, model.data_id)
exists = self.history_data_abyss.exists_data(model, old_data)
if not exists:
await self.history_data_abyss.add(model)
async def get_abyss_data(self, uid: int):
return await self.history_data_abyss.get_by_user_id(uid)
@staticmethod
def get_season_data_name(data: "HistoryDataAbyss"):
start_time = data.abyss_data.start_time.astimezone(TZ)
time = start_time.strftime("%Y.%m ")[2:] + ("" if start_time.day <= 15 else "")
honor = ""
if data.abyss_data.total_stars == 36:
if data.abyss_data.total_battles == 12:
honor = "👑"
last_battles = data.abyss_data.floors[-1].chambers[-1].battles
num_of_characters = max(
len(last_battles[0].characters),
len(last_battles[1].characters),
)
if num_of_characters == 2:
honor = "双通"
elif num_of_characters == 1:
honor = "单通"
return f"{time} {data.abyss_data.total_stars}{honor}"
async def get_session_button_data(self, user_id: int, uid: int, force: bool = False):
redis = await self.cache.get(str(uid))
if redis and not force:
return redis["buttons"]
data = await self.get_abyss_data(uid)
data.sort(key=lambda x: x.id, reverse=True)
abyss_data = [HistoryDataAbyss.from_data(i) for i in data]
buttons = [
{
"name": AbyssPlugin.get_season_data_name(abyss_data[idx]),
"value": f"get_abyss_history|{user_id}|{uid}|{value.id}",
}
for idx, value in enumerate(data)
]
await self.cache.set(str(uid), {"buttons": buttons})
return buttons
async def gen_season_button(
self,
user_id: int,
uid: int,
page: int = 1,
) -> List[List[InlineKeyboardButton]]:
"""生成按钮"""
data = await self.get_session_button_data(user_id, uid)
if not data:
return []
buttons = [
InlineKeyboardButton(
value["name"],
callback_data=value["value"],
)
for value in data
]
all_buttons = [buttons[i : i + 3] for i in range(0, len(buttons), 3)]
send_buttons = all_buttons[(page - 1) * 5 : page * 5]
last_page = page - 1 if page > 1 else 0
all_page = math.ceil(len(all_buttons) / 5)
next_page = page + 1 if page < all_page and all_page > 1 else 0
last_button = []
if last_page:
last_button.append(
InlineKeyboardButton(
"<< 上一页",
callback_data=f"get_abyss_history|{user_id}|{uid}|p_{last_page}",
)
)
if last_page or next_page:
last_button.append(
InlineKeyboardButton(
f"{page}/{all_page}",
callback_data=f"get_abyss_history|{user_id}|{uid}|empty_data",
)
)
if next_page:
last_button.append(
InlineKeyboardButton(
"下一页 >>",
callback_data=f"get_abyss_history|{user_id}|{uid}|p_{next_page}",
)
)
if last_button:
send_buttons.append(last_button)
return send_buttons
@staticmethod
async def gen_floor_button(
data_id: int,
abyss_data: "HistoryDataAbyss",
user_id: int,
uid: int,
) -> List[List[InlineKeyboardButton]]:
floors = [i.floor for i in abyss_data.abyss_data.floors if i.floor]
floors.sort()
buttons = [
InlineKeyboardButton(
f"{i}",
callback_data=f"get_abyss_history|{user_id}|{uid}|{data_id}|{i}",
)
for i in floors
]
send_buttons = [buttons[i : i + 4] for i in range(0, len(buttons), 4)]
all_buttons = [
InlineKeyboardButton(
"<< 返回",
callback_data=f"get_abyss_history|{user_id}|{uid}|p_1",
),
InlineKeyboardButton(
"总览",
callback_data=f"get_abyss_history|{user_id}|{uid}|{data_id}|total",
),
InlineKeyboardButton(
"所有",
callback_data=f"get_abyss_history|{user_id}|{uid}|{data_id}|all",
),
]
send_buttons.append(all_buttons)
return send_buttons
@handler.command("abyss_history", block=False)
@handler.message(filters.Regex(r"^深渊历史数据"), block=False)
async def abyss_history_command_start(self, update: Update, _: CallbackContext) -> None:
user_id = await self.get_real_user_id(update)
message = update.effective_message
self.log_user(update, logger.info, "查询深渊历史数据")
async with self.helper.genshin_or_public(user_id) as client:
await self.get_session_button_data(user_id, client.player_id, force=True)
buttons = await self.gen_season_button(user_id, client.player_id)
if not buttons:
await message.reply_text("还没有深渊历史数据哦~")
return
await message.reply_text("请选择要查询的深渊历史数据", reply_markup=InlineKeyboardMarkup(buttons))
async def get_abyss_history_page(self, update: "Update", user_id: int, result: str):
"""翻页处理"""
callback_query = update.callback_query
self.log_user(update, logger.info, "切换深渊历史数据页 page[%s]", result)
page = int(result.split("_")[1])
async with self.helper.genshin_or_public(user_id) as client:
buttons = await self.gen_season_button(user_id, client.player_id, page)
if not buttons:
await callback_query.answer("还没有深渊历史数据哦~", show_alert=True)
await callback_query.edit_message_text("还没有深渊历史数据哦~")
return
await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer(f"已切换到第 {page}", show_alert=False)
async def get_abyss_history_season(self, update: "Update", data_id: int):
"""进入选择层数"""
callback_query = update.callback_query
user = callback_query.from_user
self.log_user(update, logger.info, "切换深渊历史数据到层数页 data_id[%s]", data_id)
data = await self.history_data_abyss.get_by_id(data_id)
if not data:
await callback_query.answer("数据不存在,请尝试重新发送命令~", show_alert=True)
await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~")
return
abyss_data = HistoryDataAbyss.from_data(data)
buttons = await self.gen_floor_button(data_id, abyss_data, user.id, data.user_id)
await callback_query.edit_message_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer("已切换到层数页", show_alert=False)
async def get_abyss_history_floor(self, update: "Update", data_id: int, detail: str):
"""渲染层数数据"""
callback_query = update.callback_query
message = callback_query.message
floor = 0
total = False
if detail == "total":
floor = 0
elif detail == "all":
total = True
else:
floor = int(detail)
data = await self.history_data_abyss.get_by_id(data_id)
if not data:
await callback_query.answer("数据不存在,请尝试重新发送命令", show_alert=True)
await callback_query.edit_message_text("数据不存在,请尝试重新发送命令~")
return
abyss_data = HistoryDataAbyss.from_data(data)
images = await self.get_rendered_pic(
abyss_data.abyss_data, abyss_data.character_data, data.user_id, floor, total
)
if images is None:
await callback_query.answer(f"还没有第 {floor} 层的挑战数据", show_alert=True)
return
await callback_query.answer("正在渲染图片中 请稍等 请不要重复点击按钮", show_alert=False)
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
for group in ArkoWrapper(images).group(10): # 每 10 张图片分一个组
await RenderGroupResult(results=group).reply_media_group(
message, allow_sending_without_reply=True, write_timeout=60
)
self.log_user(update, logger.info, "[bold]深渊挑战数据[/bold]: 成功发送图片", extra={"markup": True})
self.add_delete_message_job(message, delay=1)
@handler.callback_query(pattern=r"^get_abyss_history\|", block=False)
async def get_abyss_history(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
callback_query = update.callback_query
user = callback_query.from_user
async def get_abyss_history_callback(
callback_query_data: str,
) -> Tuple[str, str, int, int]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_uid = int(_data[2])
_result = _data[3]
_detail = _data[4] if len(_data) > 4 else None
logger.debug(
"callback_query_data函数返回 detail[%s] result[%s] user_id[%s] uid[%s]",
_detail,
_result,
_user_id,
_uid,
)
return _detail, _result, _user_id, _uid
detail, result, user_id, _ = await get_abyss_history_callback(callback_query.data)
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
if result == "empty_data":
await callback_query.answer(text="此按钮不可用", show_alert=True)
return
if result.startswith("p_"):
await self.get_abyss_history_page(update, user_id, result)
return
data_id = int(result)
if detail:
await self.get_abyss_history_floor(update, data_id, detail)
return
await self.get_abyss_history_season(update, data_id)