Support gacha log rank

This commit is contained in:
xtaodada 2024-09-12 21:12:23 +08:00
parent 69bc4f878e
commit ca159394cf
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
15 changed files with 752 additions and 8 deletions

View File

@ -0,0 +1,59 @@
"""gacha_log_rank
Revision ID: 1220c5c80757
Revises: 87c6195e5306
Create Date: 2024-09-12 12:02:16.283418
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "1220c5c80757"
down_revision = "87c6195e5306"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"gacha_log_rank",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("player_id", sa.BigInteger(), nullable=False),
sa.Column(
"type",
sa.Enum(
"CHARACTER",
"WEAPON",
"DEFAULT",
"DEFAULT_WEAPON",
"HUN",
"PET",
name="gachalogtypeenum",
),
nullable=False,
),
sa.Column("score_1", sa.BigInteger(), nullable=True),
sa.Column("score_2", sa.BigInteger(), nullable=True),
sa.Column("score_3", sa.BigInteger(), nullable=True),
sa.Column("score_4", sa.BigInteger(), nullable=True),
sa.Column("score_5", sa.BigInteger(), nullable=True),
sa.Column("data", sa.JSON(), nullable=True),
sa.Column(
"time_created",
sa.DateTime(),
server_default=sa.text("now()"),
nullable=True,
),
sa.Column("time_updated", sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint("id", "player_id", "type"),
mysql_charset="utf8mb4",
mysql_collate="utf8mb4_general_ci",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("gacha_log_rank")
# ### end Alembic commands ###

View File

View File

@ -0,0 +1,3 @@
from gram_core.services.gacha_log_rank.cache import GachaLogRankCache
__all__ = ("GachaLogRankCache",)

View File

@ -0,0 +1,7 @@
from gram_core.services.gacha_log_rank.models import GachaLogRank, GachaLogTypeEnum, GachaLogQueryTypeEnum
__all__ = (
"GachaLogRank",
"GachaLogTypeEnum",
"GachaLogQueryTypeEnum",
)

View File

@ -0,0 +1,3 @@
from gram_core.services.gacha_log_rank.repositories import GachaLogRankRepository
__all__ = ("GachaLogRankRepository",)

View File

@ -0,0 +1,3 @@
from gram_core.services.gacha_log_rank.services import GachaLogRankService
__all__ = ("GachaLogRankService",)

@ -1 +1 @@
Subproject commit 42ce49e834dfd4ca59f2f1e59145a0785052c85b
Subproject commit e060fcac750ce32f2abad06c7db3536ba0937f05

View File

@ -12,6 +12,7 @@ from simnet.errors import AuthkeyTimeout, InvalidAuthkey
from simnet.models.starrail.wish import StarRailBannerType
from simnet.utils.player import recognize_starrail_server
from gram_core.services.gacha_log_rank.services import GachaLogRankService
from metadata.pool.pool import get_pool_by_id
from modules.gacha_log.const import GACHA_TYPE_LIST
from modules.gacha_log.error import (
@ -35,6 +36,7 @@ from modules.gacha_log.models import (
SRGFModel,
)
from modules.gacha_log.online_view import GachaLogOnlineView
from modules.gacha_log.ranks import GachaLogRanks
from utils.const import PROJECT_ROOT
from utils.uid import mask_number
@ -46,8 +48,14 @@ GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "warp_log")
GACHA_LOG_PATH.mkdir(parents=True, exist_ok=True)
class GachaLog(GachaLogOnlineView):
def __init__(self, gacha_log_path: Path = GACHA_LOG_PATH):
class GachaLog(GachaLogOnlineView, GachaLogRanks):
def __init__(
self,
gacha_log_path: Path = GACHA_LOG_PATH,
gacha_log_rank_service: GachaLogRankService = None,
):
GachaLogOnlineView.__init__(self)
GachaLogRanks.__init__(self, gacha_log_rank_service)
self.gacha_log_path = gacha_log_path
@staticmethod
@ -232,6 +240,7 @@ class GachaLog(GachaLogOnlineView):
gacha_log.update_time = datetime.datetime.now()
gacha_log.import_type = import_type.value
await self.save_gacha_log_info(str(user_id), uid, gacha_log)
await self.recount_one_from_uid(user_id, player_id)
return new_num
except GachaLogAccountNotFound as e:
raise GachaLogAccountNotFound("导入失败,文件包含的祈愿记录所属 uid 与你当前绑定的 uid 不同") from e
@ -337,7 +346,7 @@ class GachaLog(GachaLogOnlineView):
isUp, isBig = False, False
data = {
"name": item.name,
"icon": assets.avatar.square(item.name).as_uri(),
"icon": assets.avatar.square(item.name).as_uri() if assets else "",
"count": count,
"type": "角色",
"isUp": isUp,
@ -348,7 +357,7 @@ class GachaLog(GachaLogOnlineView):
elif item.item_type == "光锥" and pool_name in {"光锥跃迁", "常驻跃迁"}:
data = {
"name": item.name,
"icon": assets.light_cone.icon(item.name).as_uri(),
"icon": assets.light_cone.icon(item.name).as_uri() if assets else "",
"count": count,
"type": "光锥",
"isUp": False,
@ -376,7 +385,7 @@ class GachaLog(GachaLogOnlineView):
if item.item_type == "角色":
data = {
"name": item.name,
"icon": assets.avatar.square(item.name).as_uri(),
"icon": assets.avatar.square(item.name).as_uri() if assets else "",
"count": count,
"type": "角色",
"time": item.time,
@ -385,7 +394,7 @@ class GachaLog(GachaLogOnlineView):
elif item.item_type == "光锥":
data = {
"name": item.name,
"icon": assets.light_cone.icon(item.name).as_uri(),
"icon": assets.light_cone.icon(item.name).as_uri() if assets else "",
"count": count,
"type": "光锥",
"time": item.time,
@ -532,6 +541,19 @@ class GachaLog(GachaLogOnlineView):
gacha_log, status = await self.load_history_info(str(user_id), str(player_id))
if not status:
raise GachaLogNotFound
return await self.get_analysis_data(gacha_log, pool, assets)
async def get_analysis_data(
self, gacha_log: "GachaLogInfo", pool: StarRailBannerType, assets: Optional["AssetsService"]
):
"""
获取抽卡记录分析数据
:param gacha_log: 抽卡记录
:param pool: 池子类型
:param assets: 资源服务
:return: 分析数据
"""
player_id = int(gacha_log.uid)
pool_name = GACHA_TYPE_LIST[pool]
if pool_name not in gacha_log.item_list:
raise GachaLogNotFound

147
modules/gacha_log/ranks.py Normal file
View File

@ -0,0 +1,147 @@
import asyncio
import contextlib
from abc import abstractmethod
from pathlib import Path
from typing import List, Optional, TYPE_CHECKING, Dict
from simnet.models.starrail.wish import StarRailBannerType
from core.services.gacha_log_rank.services import GachaLogRankService
from core.services.gacha_log_rank.models import GachaLogRank, GachaLogTypeEnum, GachaLogQueryTypeEnum
from modules.gacha_log.error import GachaLogNotFound
from modules.gacha_log.models import GachaLogInfo, ImportType
from utils.log import logger
if TYPE_CHECKING:
from core.dependence.assets import AssetsService
from telegram import Message
class GachaLogError(Exception):
"""抽卡记录异常"""
class GachaLogRanks:
"""抽卡记录排行榜"""
gacha_log_path: Path
ITEM_LIST_MAP = {
"角色跃迁": GachaLogTypeEnum.CHARACTER,
"光锥跃迁": GachaLogTypeEnum.WEAPON,
"常驻跃迁": GachaLogTypeEnum.DEFAULT,
}
ITEM_LIST_MAP_REV = {
GachaLogTypeEnum.CHARACTER: "角色跃迁",
GachaLogTypeEnum.WEAPON: "光锥跃迁",
GachaLogTypeEnum.DEFAULT: "常驻跃迁",
}
BANNER_TYPE_MAP = {
"角色跃迁": StarRailBannerType.CHARACTER,
"光锥跃迁": StarRailBannerType.WEAPON,
"常驻跃迁": StarRailBannerType.PERMANENT,
}
SCORE_TYPE_MAP = {
"五星平均": GachaLogQueryTypeEnum.FIVE_STAR_AVG,
"UP平均": GachaLogQueryTypeEnum.UP_STAR_AVG,
"小保底不歪": GachaLogQueryTypeEnum.NO_WARP,
}
def __init__(
self,
gacha_log_rank_service: GachaLogRankService = None,
):
self.gacha_log_rank_service = gacha_log_rank_service
@staticmethod
@abstractmethod
async def load_json(path):
"""加载json文件"""
@abstractmethod
async def get_analysis_data(
self, gacha_log: "GachaLogInfo", pool: StarRailBannerType, assets: Optional["AssetsService"]
):
"""
获取抽卡记录分析数据
:param gacha_log: 抽卡记录
:param pool: 池子类型
:param assets: 资源服务
:return: 分析数据
"""
def parse_analysis_data(self, player_id: int, rank_type: "GachaLogTypeEnum", data: Dict) -> GachaLogRank:
line = data["line"]
total = data["allNum"]
rank = GachaLogRank(player_id=player_id, type=rank_type, score_1=total)
for l1 in line:
for l2 in l1:
label = l2["lable"]
if label in self.SCORE_TYPE_MAP:
gacha_log_type = self.SCORE_TYPE_MAP[label]
value = int(float(l2["num"]) * 100)
setattr(rank, gacha_log_type.value, value)
return rank
async def recount_one_data(self, file_path: Path) -> List[GachaLogRank]:
"""重新计算一个文件的数据"""
try:
gacha_log = GachaLogInfo.parse_obj(await self.load_json(file_path))
if gacha_log.get_import_type != ImportType.PaiGram:
raise GachaLogError("不支持的抽卡记录类型")
except ValueError as e:
raise GachaLogError from e
player_id = int(gacha_log.uid)
data = []
for k, v in self.BANNER_TYPE_MAP.items():
rank_type = self.ITEM_LIST_MAP[k]
try:
gacha_log_data = await self.get_analysis_data(gacha_log, v, None)
except GachaLogNotFound:
continue
rank = self.parse_analysis_data(player_id, rank_type, gacha_log_data)
data.append(rank)
return data
async def recount_one_from_uid(self, user_id: int, uid: int):
save_path = self.gacha_log_path / f"{user_id}-{uid}.json"
await self.recount_one(save_path)
async def recount_one(self, file_path: Path):
if not file_path.exists():
return
try:
ranks = await self.recount_one_data(file_path)
if ranks:
await self.add_or_update(ranks)
except GachaLogError:
logger.warning("更新抽卡排名失败 file[%s]", file_path)
async def add_or_update(self, ranks: List["GachaLogRank"]):
"""添加或更新用户数据"""
old_ranks = await self.gacha_log_rank_service.get_rank_by_user_id(ranks[0].player_id)
old_ranks_map = {r.type: r for r in old_ranks}
for rank in ranks:
old_rank = old_ranks_map.get(rank.type)
if old_rank:
old_rank.update_by_new(rank)
await self.gacha_log_rank_service.update(old_rank)
else:
await self.gacha_log_rank_service.add(rank)
async def recount_all_data(self, message: "Message"):
"""重新计算所有数据"""
for key1 in GachaLogTypeEnum:
for key2 in GachaLogQueryTypeEnum:
await self.gacha_log_rank_service.del_all_cache_by_type(key1, key2) # noqa
files = [f for f in self.gacha_log_path.glob("*.json") if len(f.stem.split("-")) == 2]
tasks = []
for idx, f in enumerate(files):
tasks.append(self.recount_one(f))
if len(tasks) >= 10:
await asyncio.gather(*tasks)
tasks.clear()
if idx % 10 == 1:
with contextlib.suppress(Exception):
await message.edit_text(f"已处理 {idx + 1}/{len(files)} 个文件")
if tasks:
await asyncio.gather(*tasks)

View File

@ -37,6 +37,7 @@ class SetCommandPlugin(Plugin):
BotCommand("help", "帮助"),
BotCommand("warp_log", "查看跃迁记录"),
BotCommand("warp_log_online_view", "抽卡记录在线浏览"),
BotCommand("warp_log_rank", "抽卡排行榜"),
BotCommand("action_log", "查询登录记录"),
BotCommand("dailynote", "查询实时便笺"),
BotCommand("redeem", "(国际服)兑换 Key"),
@ -78,6 +79,7 @@ class SetCommandPlugin(Plugin):
BotCommand("get_chat", "获取会话信息"),
BotCommand("add_block", "添加黑名单"),
BotCommand("del_block", "移除黑名单"),
BotCommand("warp_log_rank_recount", "重新统计抽卡排行榜"),
]
await context.bot.delete_my_commands()
await context.bot.set_my_commands(commands=group_command)

View File

@ -16,6 +16,7 @@ from core.services.template.models import FileType
from core.services.template.services import TemplateService
from gram_core.config import config
from gram_core.plugin.methods.inline_use_data import IInlineUseData
from gram_core.services.gacha_log_rank.services import GachaLogRankService
from modules.gacha_log.const import SRGF_VERSION, GACHA_TYPE_LIST_REVERSE
from modules.gacha_log.error import (
GachaLogAccountNotFound,
@ -69,6 +70,7 @@ class WishLogPlugin(Plugin.Conversation):
cookie_service: CookiesService,
head_icon: HeadIconService,
phone_theme: PhoneThemeService,
gacha_log_rank: GachaLogRankService,
):
self.template_service = template_service
self.players_service = players_service
@ -76,7 +78,7 @@ class WishLogPlugin(Plugin.Conversation):
self.cookie_service = cookie_service
self.head_icon = head_icon
self.phone_theme = phone_theme
self.gacha_log = GachaLog()
self.gacha_log = GachaLog(gacha_log_rank_service=gacha_log_rank)
self.wish_photo = None
async def get_player_id(self, user_id: int, player_id: int, offset: int) -> int:
@ -561,6 +563,15 @@ class WishLogPlugin(Plugin.Conversation):
logger.error("申请在线查看跃迁记录失败", exc_info=e)
await message.reply_text("申请在线查看跃迁记录失败,请联系管理员")
@handler.command(command="warp_log_rank_recount", block=False, admin=True)
async def wish_log_rank_recount(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
user = update.effective_user
logger.info("用户 %s[%s] wish_log_rank_recount 命令请求", user.full_name, user.id)
message = update.effective_message
reply = await message.reply_text("正在重新统计抽卡记录排行榜")
await self.gacha_log.recount_all_data(reply)
await reply.edit_text("重新统计完成")
@staticmethod
async def get_migrate_data(
old_user_id: int, new_user_id: int, old_players: List["Player"]

View File

@ -0,0 +1,351 @@
from typing import TYPE_CHECKING, List, Tuple, Dict
from pydantic import BaseModel
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction
from telegram.error import BadRequest
from telegram.ext import filters
from core.dependence.assets import AssetsService
from core.plugin import Plugin, handler
from core.services.template.models import FileType
from core.services.template.services import TemplateService
from gram_core.config import config
from gram_core.dependence.redisdb import RedisDB
from gram_core.services.gacha_log_rank.models import GachaLogRank, GachaLogTypeEnum, GachaLogQueryTypeEnum
from gram_core.services.gacha_log_rank.services import GachaLogRankService
from gram_core.services.players import PlayersService
from modules.gacha_log.ranks import GachaLogRanks
from plugins.tools.nick_name import NickNameService
from plugins.tools.phone_theme import PhoneThemeService
from utils.log import logger
from utils.uid import mask_number
if TYPE_CHECKING:
from telegram import Update
from telegram.ext import ContextTypes
class RankPlayerModel(BaseModel):
player_id: int
num: int
nickname: str
score_1: int
score_2: float
score_3: float
score_4: float
score_5: float
@property
def mask_uid(self) -> str:
return mask_number(self.player_id)
class RankDataModel(BaseModel):
players: List[RankPlayerModel]
count: int
class WishLogRankPlugin(Plugin):
"""抽卡数据排行"""
TYPES = [
("角色-总抽数", GachaLogTypeEnum.CHARACTER, GachaLogQueryTypeEnum.TOTAL),
("角色-五星平均", GachaLogTypeEnum.CHARACTER, GachaLogQueryTypeEnum.FIVE_STAR_AVG),
("角色-UP平均", GachaLogTypeEnum.CHARACTER, GachaLogQueryTypeEnum.UP_STAR_AVG),
("角色-小保底百分比", GachaLogTypeEnum.CHARACTER, GachaLogQueryTypeEnum.NO_WARP),
("光锥-总抽数", GachaLogTypeEnum.WEAPON, GachaLogQueryTypeEnum.TOTAL),
("光锥-五星平均", GachaLogTypeEnum.WEAPON, GachaLogQueryTypeEnum.FIVE_STAR_AVG),
("常驻-总抽数", GachaLogTypeEnum.DEFAULT, GachaLogQueryTypeEnum.TOTAL),
("常驻-五星平均", GachaLogTypeEnum.DEFAULT, GachaLogQueryTypeEnum.FIVE_STAR_AVG),
]
def __init__(
self,
assets_service: AssetsService = None,
template_service: TemplateService = None,
player_service: PlayersService = None,
redis: RedisDB = None,
nick_name_service: NickNameService = None,
phone_theme_service: PhoneThemeService = None,
gacha_log_rank_service: GachaLogRankService = None,
) -> None:
self.assets_service = assets_service
self.template_service = template_service
self.player_service = player_service
self.redis = redis.client
self.nick_name_service = nick_name_service
self.phone_theme_service = phone_theme_service
self.gacha_log_rank_service = gacha_log_rank_service
self.limit = 20
self.key = "plugins:gacha_log_rank"
self.expire = 30 * 60 # 30 分钟
self.expire2 = 5 * 60 # 5 分钟
self.wish_photo = None
async def get_nickname_from_uid(self, player_id: int) -> str:
nickname = "Unknown"
try:
nickname = await self.nick_name_service.get_nick_name(player_id)
except Exception:
logger.warning("获取玩家昵称失败 player_id[%s]", player_id)
return nickname
@staticmethod
def mysql_to_model(rank: "GachaLogRank", num: int, nickname: str) -> RankPlayerModel:
return RankPlayerModel(
player_id=rank.player_id,
num=num,
nickname=nickname,
score_1=rank.score_1 or 0,
score_2=(rank.score_2 / 100.0) if rank.score_2 else 0,
score_3=(rank.score_3 / 100.0) if rank.score_3 else 0,
score_4=(rank.score_4 / 100.0) if rank.score_4 else 0,
score_5=(rank.score_5 / 100.0) if rank.score_5 else 0,
)
@staticmethod
def get_desc_type(query_type: "GachaLogQueryTypeEnum") -> bool:
desc = True
if query_type not in (GachaLogQueryTypeEnum.TOTAL, GachaLogQueryTypeEnum.NO_WARP):
desc = False
return desc
async def get_first_rank_players_from_sql(
self, rank_type: "GachaLogTypeEnum", query_type: "GachaLogQueryTypeEnum", desc: bool
) -> RankDataModel:
real_desc = self.get_desc_type(query_type)
if desc:
real_desc = not real_desc
ranks_uids = await self.gacha_log_rank_service.get_ranks_cache(rank_type, query_type, desc=real_desc)
count = await self.gacha_log_rank_service.get_ranks_length_cache(rank_type, query_type)
uid_list = [int(uid) for uid, _ in ranks_uids]
ranks = await self.gacha_log_rank_service.get_ranks_by_ids(rank_type, uid_list)
players = []
for rank in ranks:
nickname = await self.get_nickname_from_uid(rank.player_id)
players.append(self.mysql_to_model(rank, uid_list.index(rank.player_id) + 1, nickname))
players.sort(key=lambda x: x.num)
return RankDataModel(players=players, count=count)
async def get_first_rank_players_from_cache(
self, rank_type: "GachaLogTypeEnum", query_type: "GachaLogQueryTypeEnum", desc: bool
) -> RankDataModel:
desc_int = 1 if desc else 0
key = f"{self.key}:{rank_type.value}:{query_type.value}:{desc_int}:total"
data = await self.redis.get(key)
if data:
return RankDataModel.parse_raw(str(data, encoding="utf-8"))
data = await self.get_first_rank_players_from_sql(rank_type, query_type, desc)
await self.redis.set(key, data.json(by_alias=True), ex=self.expire)
return data
async def get_my_players_from_sql(
self, user_id: int, rank_type: "GachaLogTypeEnum", query_type: "GachaLogQueryTypeEnum", desc: bool
) -> RankDataModel:
players1 = await self.player_service.get_all_by_user_id(user_id)
ranks = await self.gacha_log_rank_service.get_ranks_by_ids(rank_type, [player.player_id for player in players1])
players = []
real_desc = self.get_desc_type(query_type)
if desc:
real_desc = not real_desc
for rank in ranks:
num = await self.gacha_log_rank_service.get_rank_by_player_id_cache(
rank_type, query_type, rank.player_id, desc=real_desc
)
if num is None:
continue
nickname = await self.get_nickname_from_uid(rank.player_id)
players.append(self.mysql_to_model(rank, num + 1, nickname))
players.sort(key=lambda x: x.num)
return RankDataModel(players=players, count=len(players))
async def get_my_players_from_cache(
self, user_id: int, rank_type: "GachaLogTypeEnum", query_type: "GachaLogQueryTypeEnum", desc: bool
) -> RankDataModel:
desc_int = 1 if desc else 0
key = f"{self.key}:{rank_type.value}:{query_type.value}:{desc_int}:{user_id}"
data = await self.redis.get(key)
if data:
return RankDataModel.parse_raw(str(data, encoding="utf-8"))
data = await self.get_my_players_from_sql(user_id, rank_type, query_type, desc)
await self.redis.set(key, data.json(by_alias=True), ex=self.expire2)
return data
@staticmethod
def get_data_key_map_by_type(rank_type: "GachaLogTypeEnum"):
data = {
"总抽数": "score_1",
"五星平均": "score_2",
}
if rank_type == GachaLogTypeEnum.CHARACTER:
data.update(
{
"UP平均": "score_3",
"小保底百分比": "score_4",
}
)
return data
def gen_button(self, user_id: int, desc: bool = False) -> List[List[InlineKeyboardButton]]:
types = [self.TYPES[i : i + 2] for i in range(0, len(self.TYPES), 2)]
if desc:
now_bind, new_bind, now_int, new_int = "非酋榜", "欧皇榜", 1, 0
else:
now_bind, new_bind, now_int, new_int = "欧皇榜", "非酋榜", 0, 1
data = [
[
InlineKeyboardButton(
idx[0],
callback_data=f"wish_log_rank|{user_id}|{idx[1].value}|{idx[2].value}|{now_int}",
)
for idx in id1
]
for id1 in types
]
page_button = [
InlineKeyboardButton(f"当前是{now_bind}", callback_data=f"wish_log_rank_button|{user_id}|ignore"),
InlineKeyboardButton(f"切换到{new_bind}", callback_data=f"wish_log_rank_button|{user_id}|{new_int}"),
]
data.append(page_button)
return data
@handler.command("warp_log_rank", block=False)
@handler.message(filters.Regex(r"^抽卡排行榜(.*)$"), block=False)
async def wish_log_rank(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE"):
user_id = await self.get_real_user_id(update)
message = update.effective_message
buttons = self.gen_button(user_id)
if isinstance(self.wish_photo, str):
photo = self.wish_photo
else:
photo = open("resources/img/wish.jpg", "rb")
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
reply_message = await message.reply_photo(
photo=photo,
caption="请选择你要查询的抽卡排行榜",
reply_markup=InlineKeyboardMarkup(buttons),
)
if reply_message.photo:
self.wish_photo = reply_message.photo[-1].file_id
@handler.callback_query(pattern=r"^wish_log_rank\|", block=False)
async def wish_log_rank_callback(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None:
callback_query = update.callback_query
user = callback_query.from_user
message = callback_query.message
async def get_wish_log_rank_callback(
callback_query_data: str,
) -> Tuple[int, GachaLogTypeEnum, GachaLogQueryTypeEnum, bool]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_rank_type = GachaLogTypeEnum(int(_data[2]))
_query_type = GachaLogQueryTypeEnum(_data[3])
_desc = bool(int(_data[4]))
logger.debug(
"callback_query_data函数返回 user_id[%s] rank_type[%s] query_type[%s] desc[%s]",
_user_id,
_rank_type,
_query_type,
_desc,
)
return _user_id, _rank_type, _query_type, _desc
try:
user_id, rank_type, query_type, desc = await get_wish_log_rank_callback(callback_query.data)
except IndexError:
await callback_query.answer("按钮数据已过期,请重新获取。", show_alert=True)
self.add_delete_message_job(message, delay=1)
return
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
await self.render(update, context, user_id, rank_type, query_type, desc)
async def get_render_data(
self,
user_id: int,
rank_type: "GachaLogTypeEnum",
query_type: "GachaLogQueryTypeEnum",
desc: bool,
) -> Dict:
my_data = await self.get_my_players_from_cache(user_id, rank_type, query_type, desc)
list_data = await self.get_first_rank_players_from_cache(rank_type, query_type, desc)
name_card = self.phone_theme_service.get_default_phone_theme().as_uri()
return {
"data_list": [my_data, list_data],
"count": list_data.count,
"namecard": name_card,
"pool_name": GachaLogRanks.ITEM_LIST_MAP_REV.get(rank_type),
"data_key_map": self.get_data_key_map_by_type(rank_type),
"main_key": query_type,
"desc": desc,
}
async def render(
self,
update: "Update",
_: "ContextTypes.DEFAULT_TYPE",
user_id: int,
rank_type: "GachaLogTypeEnum",
query_type: "GachaLogQueryTypeEnum",
desc: bool = False,
):
callback_query = update.callback_query
message = callback_query.message
await message.reply_chat_action(ChatAction.TYPING)
render_data = await self.get_render_data(user_id, rank_type, query_type, desc)
try:
await callback_query.answer(text="正在渲染图片中 请稍等 请不要重复点击按钮", show_alert=False)
except BadRequest:
pass
png_data = await self.template_service.render(
"genshin/gacha_log_rank/rank.jinja2",
render_data,
viewport={"width": 1040, "height": 500},
full_page=True,
query_selector=".container",
file_type=FileType.PHOTO,
ttl=1 * 60 * 60,
)
await png_data.edit_media(message)
@handler.callback_query(pattern=r"^wish_log_rank_button\|", block=False)
async def wish_log_rank_button_callback(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None:
callback_query = update.callback_query
user = callback_query.from_user
message = callback_query.message
async def get_wish_log_rank_button_callback(
callback_query_data: str,
) -> Tuple[int, bool, bool]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_ignore = _data[2] == "ignore"
_desc = False if _ignore else bool(int(_data[2]))
logger.debug(
"callback_query_data函数返回 user_id[%s] ignore[%s] desc[%s]",
_user_id,
_ignore,
_desc,
)
return _user_id, _ignore, _desc
try:
user_id, ignore, desc = await get_wish_log_rank_button_callback(callback_query.data)
except IndexError:
await callback_query.answer("按钮数据已过期,请重新获取。", show_alert=True)
self.add_delete_message_job(message, delay=1)
return
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
if ignore:
await callback_query.answer("无效按钮", show_alert=False)
return
buttons = self.gen_button(user_id, desc)
await message.edit_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer("已切换", show_alert=False)

View File

@ -0,0 +1,61 @@
from typing import Optional
from core.dependence.assets import AssetsService
from core.services.players.services import PlayerInfoService
from gram_core.dependence.redisdb import RedisDB
from gram_core.plugin import Plugin
class NickNameService(Plugin):
def __init__(
self,
player_info_service: PlayerInfoService,
asset_service: AssetsService,
redis: RedisDB,
) -> None:
self.player_info_service = player_info_service
self.assets = asset_service
self.redis = redis.client
self.expire = 60 * 60
self.qname = "plugins:nick_name"
@staticmethod
def get_default_nickname() -> str:
return "Unknown"
async def get_from_cache(self, player_id: int) -> Optional[str]:
key = f"{self.qname}:{player_id}"
data = await self.redis.get(key)
if data is None:
return None
return str(data, encoding="utf-8")
async def set_to_cache(self, player_id: int, nick_name: str) -> None:
key = f"{self.qname}:{player_id}"
await self.redis.set(key, nick_name, ex=self.expire)
async def get_from_sql(self, player_id: int) -> Optional[str]:
player_info = await self.player_info_service.get_by_player_id(player_id)
if player_info is None:
return None
return player_info.nickname
async def get_from_mihomo(self, player_id: int) -> Optional[str]:
player_info = await self.player_info_service.get_player_info_from_mihomo(player_id)
if player_info is None:
return None
return player_info.nickname
async def get_nick_name(self, player_id: int) -> Optional[str]:
nickname = await self.get_from_cache(player_id)
if nickname is not None:
return nickname
nickname = await self.get_from_sql(player_id)
if nickname is not None:
await self.set_to_cache(player_id, nickname)
return nickname
nickname = await self.get_from_mihomo(player_id)
if nickname is not None:
await self.set_to_cache(player_id, nickname)
return nickname
return None

View File

@ -0,0 +1,64 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Avatar List</title>
<link type="text/css" href="./style.css" rel="stylesheet"/>
<link type="text/css" href="../avatar_list/style.css" rel="stylesheet"/>
<link type="text/css" href="../../styles/public.css" rel="stylesheet"/>
</head>
<body>
<div class="container">
<div class="head" style="background-image: url('{{ namecard }}')">
<div class="player">
<div>
<div class="nickname">抽卡排行榜 - {{ pool_name }}</div>
{% if desc %}
{% set rank_name = "非酋榜" %}
{% else %}
{% set rank_name = "欧皇榜" %}
{% endif %}
<div class="uid">{{ rank_name }} - 共 {{ count }} 条数据</div>
</div>
</div>
<div class="logo"></div>
</div>
{% for my_data in data_list %}
<div class="content">
<div class="row">
<div>#</div>
<div style="flex: 3">UID</div>
<div style="flex: 3">昵称</div>
{% for key in data_key_map.keys() %}
<div style="flex: 3">{{ key }}</div>
{% endfor %}
</div>
{% for data in my_data.players %}
<div
{% if loop.index is even %}
class="row second-row"
{% else %}
class="row"
{% endif %}
>
<div>{{ data.num }}</div>
<div style="flex: 3">{{ data.mask_uid }}</div>
<div style="flex: 3" class="username">{{ data.nickname }}</div>
{% for value in data_key_map.values() %}
{% if value == main_key %}
{% set style = "background-color: rgb(229 171 229/70%);" %}
{% else %}
{% set style = "" %}
{% endif %}
<div style="flex: 3; {{ style }}">{{ data[value] }}</div>
{% endfor %}
</div>
{% endfor %}
</div>
{% if loop.index == 1 %}
<div style="height: 50px"></div>
{% endif %}
{% endfor %}
</div>
</body>
</html>

View File

@ -0,0 +1,11 @@
.username {
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
.player {
margin-left: 50px;
}
.head {
background-size: cover!important;
}