Support gacha log rank

This commit is contained in:
xtaodada 2024-09-12 20:29:24 +08:00
parent 289f164a0c
commit 106da758f2
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
14 changed files with 690 additions and 10 deletions

View File

@ -0,0 +1,61 @@
"""gacha_log_rank
Revision ID: 1220c5c80757
Revises: 87c6195e5306
Create Date: 2024-09-12 12:02:16.283418
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "1220c5c80757"
down_revision = "87c6195e5306"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"gacha_log_rank",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("player_id", sa.BigInteger(), nullable=False),
sa.Column(
"type",
sa.Enum(
"CHARACTER",
"WEAPON",
"DEFAULT",
"DEFAULT_WEAPON",
"HUN",
"PET",
name="gachalogtypeenum",
),
nullable=False,
),
sa.Column("score_1", sa.BigInteger(), nullable=True),
sa.Column("score_2", sa.BigInteger(), nullable=True),
sa.Column("score_3", sa.BigInteger(), nullable=True),
sa.Column("score_4", sa.BigInteger(), nullable=True),
sa.Column("score_5", sa.BigInteger(), nullable=True),
sa.Column("data", sa.JSON(), nullable=True),
sa.Column(
"time_created",
sa.DateTime(),
server_default=sa.text("now()"),
nullable=True,
),
sa.Column("time_updated", sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint("id", "player_id", "type"),
mysql_charset="utf8mb4",
mysql_collate="utf8mb4_general_ci",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("gacha_log_rank")
# ### end Alembic commands ###

View File

View File

@ -0,0 +1,3 @@
from gram_core.services.gacha_log_rank.cache import GachaLogRankCache
__all__ = ("GachaLogRankCache",)

View File

@ -0,0 +1,7 @@
from gram_core.services.gacha_log_rank.models import GachaLogRank, GachaLogTypeEnum, GachaLogQueryTypeEnum
__all__ = (
"GachaLogRank",
"GachaLogTypeEnum",
"GachaLogQueryTypeEnum",
)

View File

@ -0,0 +1,3 @@
from gram_core.services.gacha_log_rank.repositories import GachaLogRankRepository
__all__ = ("GachaLogRankRepository",)

View File

@ -0,0 +1,3 @@
from gram_core.services.gacha_log_rank.services import GachaLogRankService
__all__ = ("GachaLogRankService",)

@ -1 +1 @@
Subproject commit 42ce49e834dfd4ca59f2f1e59145a0785052c85b Subproject commit e060fcac750ce32f2abad06c7db3536ba0937f05

View File

@ -12,6 +12,7 @@ from simnet.errors import AuthkeyTimeout, InvalidAuthkey
from simnet.models.zzz.wish import ZZZBannerType from simnet.models.zzz.wish import ZZZBannerType
from simnet.utils.player import recognize_zzz_server from simnet.utils.player import recognize_zzz_server
from gram_core.services.gacha_log_rank.services import GachaLogRankService
from metadata.pool.pool import get_pool_by_id from metadata.pool.pool import get_pool_by_id
from modules.gacha_log.const import GACHA_TYPE_LIST from modules.gacha_log.const import GACHA_TYPE_LIST
from modules.gacha_log.error import ( from modules.gacha_log.error import (
@ -35,6 +36,7 @@ from modules.gacha_log.models import (
ZZZGFModel, ZZZGFModel,
) )
from modules.gacha_log.online_view import GachaLogOnlineView from modules.gacha_log.online_view import GachaLogOnlineView
from modules.gacha_log.ranks import GachaLogRanks
from utils.const import PROJECT_ROOT from utils.const import PROJECT_ROOT
from utils.uid import mask_number from utils.uid import mask_number
@ -46,8 +48,14 @@ GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "signal_log")
GACHA_LOG_PATH.mkdir(parents=True, exist_ok=True) GACHA_LOG_PATH.mkdir(parents=True, exist_ok=True)
class GachaLog(GachaLogOnlineView): class GachaLog(GachaLogOnlineView, GachaLogRanks):
def __init__(self, gacha_log_path: Path = GACHA_LOG_PATH): def __init__(
self,
gacha_log_path: Path = GACHA_LOG_PATH,
gacha_log_rank_service: GachaLogRankService = None,
):
GachaLogOnlineView.__init__(self)
GachaLogRanks.__init__(self, gacha_log_rank_service)
self.gacha_log_path = gacha_log_path self.gacha_log_path = gacha_log_path
@staticmethod @staticmethod
@ -306,6 +314,7 @@ class GachaLog(GachaLogOnlineView):
gacha_log.update_time = datetime.datetime.now() gacha_log.update_time = datetime.datetime.now()
gacha_log.import_type = ImportType.PaiGram.value gacha_log.import_type = ImportType.PaiGram.value
await self.save_gacha_log_info(str(user_id), str(player_id), gacha_log) await self.save_gacha_log_info(str(user_id), str(player_id), gacha_log)
await self.recount_one_from_uid(user_id, player_id)
return new_num return new_num
@staticmethod @staticmethod
@ -337,7 +346,7 @@ class GachaLog(GachaLogOnlineView):
isUp, isBig = False, False isUp, isBig = False, False
data = { data = {
"name": item.name, "name": item.name,
"icon": assets.avatar.normal(item.name).as_uri(), "icon": assets.avatar.normal(item.name).as_uri() if assets else "",
"count": count, "count": count,
"type": "代理人", "type": "代理人",
"isUp": isUp, "isUp": isUp,
@ -348,7 +357,7 @@ class GachaLog(GachaLogOnlineView):
elif item.item_type == "音擎" and pool_name in {"音擎调频", "常驻调频"}: elif item.item_type == "音擎" and pool_name in {"音擎调频", "常驻调频"}:
data = { data = {
"name": item.name, "name": item.name,
"icon": assets.weapon.icon(item.name).as_uri(), "icon": assets.weapon.icon(item.name).as_uri() if assets else "",
"count": count, "count": count,
"type": "音擎", "type": "音擎",
"isUp": False, "isUp": False,
@ -359,7 +368,7 @@ class GachaLog(GachaLogOnlineView):
elif item.item_type == "邦布" and pool_name in {"邦布调频"}: elif item.item_type == "邦布" and pool_name in {"邦布调频"}:
data = { data = {
"name": item.name, "name": item.name,
"icon": assets.buddy.icon(item.name).as_uri(), "icon": assets.buddy.icon(item.name).as_uri() if assets else "",
"count": count, "count": count,
"type": "邦布", "type": "邦布",
"isUp": False, "isUp": False,
@ -387,7 +396,7 @@ class GachaLog(GachaLogOnlineView):
if item.item_type == "代理人": if item.item_type == "代理人":
data = { data = {
"name": item.name, "name": item.name,
"icon": assets.avatar.normal(item.name).as_uri(), "icon": assets.avatar.normal(item.name).as_uri() if assets else "",
"count": count, "count": count,
"type": "代理人", "type": "代理人",
"time": item.time, "time": item.time,
@ -396,7 +405,7 @@ class GachaLog(GachaLogOnlineView):
elif item.item_type == "音擎": elif item.item_type == "音擎":
data = { data = {
"name": item.name, "name": item.name,
"icon": assets.weapon.icon(item.name).as_uri(), "icon": assets.weapon.icon(item.name).as_uri() if assets else "",
"count": count, "count": count,
"type": "音擎", "type": "音擎",
"time": item.time, "time": item.time,
@ -405,7 +414,7 @@ class GachaLog(GachaLogOnlineView):
elif item.item_type == "邦布": elif item.item_type == "邦布":
data = { data = {
"name": item.name, "name": item.name,
"icon": assets.buddy.icon(item.name).as_uri(), "icon": assets.buddy.icon(item.name).as_uri() if assets else "",
"count": count, "count": count,
"type": "邦布", "type": "邦布",
"time": item.time, "time": item.time,
@ -552,6 +561,19 @@ class GachaLog(GachaLogOnlineView):
gacha_log, status = await self.load_history_info(str(user_id), str(player_id)) gacha_log, status = await self.load_history_info(str(user_id), str(player_id))
if not status: if not status:
raise GachaLogNotFound raise GachaLogNotFound
return await self.get_analysis_data(gacha_log, pool, assets)
async def get_analysis_data(
self, gacha_log: "GachaLogInfo", pool: ZZZBannerType, assets: Optional["AssetsService"]
):
"""
获取抽卡记录分析数据
:param gacha_log: 抽卡记录
:param pool: 池子类型
:param assets: 资源服务
:return: 分析数据
"""
player_id = gacha_log.uid
pool_name = GACHA_TYPE_LIST[pool] pool_name = GACHA_TYPE_LIST[pool]
if pool_name not in gacha_log.item_list: if pool_name not in gacha_log.item_list:
raise GachaLogNotFound raise GachaLogNotFound

144
modules/gacha_log/ranks.py Normal file
View File

@ -0,0 +1,144 @@
import asyncio
import contextlib
from abc import abstractmethod
from pathlib import Path
from typing import List, Optional, TYPE_CHECKING, Dict
from simnet.models.genshin.wish import BannerType
from simnet.models.zzz.wish import ZZZBannerType
from core.services.gacha_log_rank.services import GachaLogRankService
from core.services.gacha_log_rank.models import GachaLogRank, GachaLogTypeEnum, GachaLogQueryTypeEnum
from modules.gacha_log.error import GachaLogNotFound
from modules.gacha_log.models import GachaLogInfo, ImportType
from utils.log import logger
if TYPE_CHECKING:
from core.dependence.assets import AssetsService
from telegram import Message
class GachaLogError(Exception):
"""抽卡记录异常"""
class GachaLogRanks:
"""抽卡记录排行榜"""
gacha_log_path: Path
ITEM_LIST_MAP = {
"代理人调频": GachaLogTypeEnum.CHARACTER,
"音擎调频": GachaLogTypeEnum.WEAPON,
"常驻调频": GachaLogTypeEnum.DEFAULT,
"邦布调频": GachaLogTypeEnum.PET,
}
ITEM_LIST_MAP_REV = {v: k for k, v in ITEM_LIST_MAP.items()}
BANNER_TYPE_MAP = {
"代理人调频": ZZZBannerType.CHARACTER,
"音擎调频": ZZZBannerType.WEAPON,
"常驻调频": ZZZBannerType.PERMANENT,
"邦布调频": ZZZBannerType.BANGBOO,
}
SCORE_TYPE_MAP = {
"五星平均": GachaLogQueryTypeEnum.FIVE_STAR_AVG,
"UP平均": GachaLogQueryTypeEnum.UP_STAR_AVG,
"小保底不歪": GachaLogQueryTypeEnum.NO_WARP,
}
def __init__(
self,
gacha_log_rank_service: GachaLogRankService = None,
):
self.gacha_log_rank_service = gacha_log_rank_service
@staticmethod
@abstractmethod
async def load_json(path):
"""加载json文件"""
@abstractmethod
async def get_analysis_data(self, gacha_log: "GachaLogInfo", pool: BannerType, assets: Optional["AssetsService"]):
"""
获取抽卡记录分析数据
:param gacha_log: 抽卡记录
:param pool: 池子类型
:param assets: 资源服务
:return: 分析数据
"""
def parse_analysis_data(self, player_id: int, rank_type: "GachaLogTypeEnum", data: Dict) -> GachaLogRank:
line = data["line"]
total = data["allNum"]
rank = GachaLogRank(player_id=player_id, type=rank_type, score_1=total)
for l1 in line:
for l2 in l1:
label = l2["lable"]
if label in self.SCORE_TYPE_MAP:
gacha_log_type = self.SCORE_TYPE_MAP[label]
value = int(float(l2["num"]) * 100)
setattr(rank, gacha_log_type.value, value)
return rank
async def recount_one_data(self, file_path: Path) -> List[GachaLogRank]:
"""重新计算一个文件的数据"""
try:
gacha_log = GachaLogInfo.parse_obj(await self.load_json(file_path))
if gacha_log.get_import_type != ImportType.PaiGram:
raise GachaLogError("不支持的抽卡记录类型")
except ValueError as e:
raise GachaLogError from e
player_id = int(gacha_log.uid)
data = []
for k, v in self.BANNER_TYPE_MAP.items():
rank_type = self.ITEM_LIST_MAP[k]
try:
gacha_log_data = await self.get_analysis_data(gacha_log, v, None)
except GachaLogNotFound:
continue
rank = self.parse_analysis_data(player_id, rank_type, gacha_log_data)
data.append(rank)
return data
async def recount_one_from_uid(self, user_id: int, uid: int):
save_path = self.gacha_log_path / f"{user_id}-{uid}.json"
await self.recount_one(save_path)
async def recount_one(self, file_path: Path):
if not file_path.exists():
return
try:
ranks = await self.recount_one_data(file_path)
if ranks:
await self.add_or_update(ranks)
except GachaLogError:
logger.warning("更新抽卡排名失败 file[%s]", file_path)
async def add_or_update(self, ranks: List["GachaLogRank"]):
"""添加或更新用户数据"""
old_ranks = await self.gacha_log_rank_service.get_rank_by_user_id(ranks[0].player_id)
old_ranks_map = {r.type: r for r in old_ranks}
for rank in ranks:
old_rank = old_ranks_map.get(rank.type)
if old_rank:
old_rank.update_by_new(rank)
await self.gacha_log_rank_service.update(old_rank)
else:
await self.gacha_log_rank_service.add(rank)
async def recount_all_data(self, message: "Message"):
"""重新计算所有数据"""
for key1 in GachaLogTypeEnum:
for key2 in GachaLogQueryTypeEnum:
await self.gacha_log_rank_service.del_all_cache_by_type(key1, key2) # noqa
files = [f for f in self.gacha_log_path.glob("*.json") if len(f.stem.split("-")) == 2]
tasks = []
for idx, f in enumerate(files):
tasks.append(self.recount_one(f))
if len(tasks) >= 10:
await asyncio.gather(*tasks)
tasks.clear()
if idx % 10 == 1:
with contextlib.suppress(Exception):
await message.edit_text(f"已处理 {idx + 1}/{len(files)} 个文件")
if tasks:
await asyncio.gather(*tasks)

View File

@ -27,6 +27,7 @@ class SetCommandPlugin(Plugin):
BotCommand("signal_log_export", "导出调频记录"), BotCommand("signal_log_export", "导出调频记录"),
BotCommand("signal_log_delete", "删除调频记录"), BotCommand("signal_log_delete", "删除调频记录"),
BotCommand("signal_log_online_view", "调频记录在线浏览"), BotCommand("signal_log_online_view", "调频记录在线浏览"),
BotCommand("signal_log_rank", "抽卡排行榜"),
BotCommand("avatars", "查询角色练度"), BotCommand("avatars", "查询角色练度"),
BotCommand("player_card", "角色卡片"), BotCommand("player_card", "角色卡片"),
BotCommand("agent_detail", "角色详细信息"), BotCommand("agent_detail", "角色详细信息"),
@ -69,6 +70,7 @@ class SetCommandPlugin(Plugin):
BotCommand("get_chat", "获取会话信息"), BotCommand("get_chat", "获取会话信息"),
BotCommand("add_block", "添加黑名单"), BotCommand("add_block", "添加黑名单"),
BotCommand("del_block", "移除黑名单"), BotCommand("del_block", "移除黑名单"),
BotCommand("signal_log_rank_recount", "重新统计抽卡排行榜"),
] ]
await context.bot.set_my_commands( await context.bot.set_my_commands(
commands=group_command commands=group_command

View File

@ -19,6 +19,7 @@ from core.services.template.services import TemplateService
from gram_core.basemodel import RegionEnum from gram_core.basemodel import RegionEnum
from gram_core.config import config from gram_core.config import config
from gram_core.plugin.methods.inline_use_data import IInlineUseData from gram_core.plugin.methods.inline_use_data import IInlineUseData
from gram_core.services.gacha_log_rank.services import GachaLogRankService
from modules.gacha_log.const import ZZZGF_VERSION, GACHA_TYPE_LIST_REVERSE from modules.gacha_log.const import ZZZGF_VERSION, GACHA_TYPE_LIST_REVERSE
from modules.gacha_log.error import ( from modules.gacha_log.error import (
GachaLogAccountNotFound, GachaLogAccountNotFound,
@ -78,12 +79,13 @@ class WishLogPlugin(Plugin.Conversation):
assets: AssetsService, assets: AssetsService,
cookie_service: CookiesService, cookie_service: CookiesService,
player_info: PlayerInfoSystem, player_info: PlayerInfoSystem,
gacha_log_rank: GachaLogRankService,
): ):
self.template_service = template_service self.template_service = template_service
self.players_service = players_service self.players_service = players_service
self.assets_service = assets self.assets_service = assets
self.cookie_service = cookie_service self.cookie_service = cookie_service
self.gacha_log = GachaLog() self.gacha_log = GachaLog(gacha_log_rank_service=gacha_log_rank)
self.wish_photo = None self.wish_photo = None
self.player_info = player_info self.player_info = player_info
@ -623,6 +625,15 @@ class WishLogPlugin(Plugin.Conversation):
logger.error("申请在线查看调频记录失败", exc_info=e) logger.error("申请在线查看调频记录失败", exc_info=e)
await message.reply_text("申请在线查看调频记录失败,请联系管理员") await message.reply_text("申请在线查看调频记录失败,请联系管理员")
@handler.command(command="signal_log_rank_recount", block=False, admin=True)
async def wish_log_rank_recount(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> None:
user = update.effective_user
logger.info("用户 %s[%s] signal_log_rank_recount 命令请求", user.full_name, user.id)
message = update.effective_message
reply = await message.reply_text("正在重新统计抽卡记录排行榜")
await self.gacha_log.recount_all_data(reply)
await reply.edit_text("重新统计完成")
@staticmethod @staticmethod
async def get_migrate_data( async def get_migrate_data(
old_user_id: int, new_user_id: int, old_players: List["Player"] old_user_id: int, new_user_id: int, old_players: List["Player"]

View File

@ -0,0 +1,352 @@
from typing import TYPE_CHECKING, List, Tuple, Dict
from pydantic import BaseModel
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction
from telegram.error import BadRequest
from telegram.ext import filters
from core.dependence.assets import AssetsService
from core.plugin import Plugin, handler
from core.services.template.models import FileType
from core.services.template.services import TemplateService
from gram_core.config import config
from gram_core.dependence.redisdb import RedisDB
from gram_core.services.gacha_log_rank.models import GachaLogRank, GachaLogTypeEnum, GachaLogQueryTypeEnum
from gram_core.services.gacha_log_rank.services import GachaLogRankService
from gram_core.services.players import PlayersService
from modules.gacha_log.ranks import GachaLogRanks
from plugins.tools.player_info import PlayerInfoSystem
from utils.log import logger
from utils.uid import mask_number
if TYPE_CHECKING:
from telegram import Update
from telegram.ext import ContextTypes
class RankPlayerModel(BaseModel):
player_id: int
num: int
nickname: str
score_1: int
score_2: float
score_3: float
score_4: float
score_5: float
@property
def mask_uid(self) -> str:
return mask_number(self.player_id)
class RankDataModel(BaseModel):
players: List[RankPlayerModel]
count: int
class WishLogRankPlugin(Plugin):
"""抽卡数据排行"""
TYPES = [
("代理人-总抽数", GachaLogTypeEnum.CHARACTER, GachaLogQueryTypeEnum.TOTAL),
("代理人-五星平均", GachaLogTypeEnum.CHARACTER, GachaLogQueryTypeEnum.FIVE_STAR_AVG),
("代理人-UP平均", GachaLogTypeEnum.CHARACTER, GachaLogQueryTypeEnum.UP_STAR_AVG),
("代理人-小保底百分比", GachaLogTypeEnum.CHARACTER, GachaLogQueryTypeEnum.NO_WARP),
("音擎-总抽数", GachaLogTypeEnum.WEAPON, GachaLogQueryTypeEnum.TOTAL),
("音擎-五星平均", GachaLogTypeEnum.WEAPON, GachaLogQueryTypeEnum.FIVE_STAR_AVG),
("常驻-总抽数", GachaLogTypeEnum.DEFAULT, GachaLogQueryTypeEnum.TOTAL),
("常驻-五星平均", GachaLogTypeEnum.DEFAULT, GachaLogQueryTypeEnum.FIVE_STAR_AVG),
("邦布-总抽数", GachaLogTypeEnum.PET, GachaLogQueryTypeEnum.TOTAL),
("邦布-五星平均", GachaLogTypeEnum.PET, GachaLogQueryTypeEnum.FIVE_STAR_AVG),
]
def __init__(
self,
assets_service: AssetsService = None,
template_service: TemplateService = None,
player_service: PlayersService = None,
redis: RedisDB = None,
player_info: PlayerInfoSystem = None,
gacha_log_rank_service: GachaLogRankService = None,
) -> None:
self.assets_service = assets_service
self.template_service = template_service
self.player_service = player_service
self.redis = redis.client
self.player_info = player_info
self.gacha_log_rank_service = gacha_log_rank_service
self.limit = 20
self.key = "plugins:gacha_log_rank"
self.expire = 30 * 60 # 30 分钟
self.expire2 = 5 * 60 # 5 分钟
self.wish_photo = None
async def get_nickname_from_uid(self, player_id: int) -> str:
nickname = "Unknown"
try:
info = await self.player_info.get_player_info(player_id, nickname)
if info:
nickname = info.nickname or "Unknown"
except Exception:
logger.warning("获取玩家昵称失败 player_id[%s]", player_id)
return nickname
@staticmethod
def mysql_to_model(rank: "GachaLogRank", num: int, nickname: str) -> RankPlayerModel:
return RankPlayerModel(
player_id=rank.player_id,
num=num,
nickname=nickname,
score_1=rank.score_1 or 0,
score_2=(rank.score_2 / 100.0) if rank.score_2 else 0,
score_3=(rank.score_3 / 100.0) if rank.score_3 else 0,
score_4=(rank.score_4 / 100.0) if rank.score_4 else 0,
score_5=(rank.score_5 / 100.0) if rank.score_5 else 0,
)
@staticmethod
def get_desc_type(query_type: "GachaLogQueryTypeEnum") -> bool:
desc = True
if query_type not in (GachaLogQueryTypeEnum.TOTAL, GachaLogQueryTypeEnum.NO_WARP):
desc = False
return desc
async def get_first_rank_players_from_sql(
self, rank_type: "GachaLogTypeEnum", query_type: "GachaLogQueryTypeEnum", desc: bool
) -> RankDataModel:
real_desc = self.get_desc_type(query_type)
if desc:
real_desc = not real_desc
ranks_uids = await self.gacha_log_rank_service.get_ranks_cache(rank_type, query_type, desc=real_desc)
count = await self.gacha_log_rank_service.get_ranks_length_cache(rank_type, query_type)
uid_list = [int(uid) for uid, _ in ranks_uids]
ranks = await self.gacha_log_rank_service.get_ranks_by_ids(rank_type, uid_list)
players = []
for rank in ranks:
nickname = await self.get_nickname_from_uid(rank.player_id)
players.append(self.mysql_to_model(rank, uid_list.index(rank.player_id) + 1, nickname))
players.sort(key=lambda x: x.num)
return RankDataModel(players=players, count=count)
async def get_first_rank_players_from_cache(
self, rank_type: "GachaLogTypeEnum", query_type: "GachaLogQueryTypeEnum", desc: bool
) -> RankDataModel:
desc_int = 1 if desc else 0
key = f"{self.key}:{rank_type.value}:{query_type.value}:{desc_int}:total"
data = await self.redis.get(key)
if data:
return RankDataModel.parse_raw(str(data, encoding="utf-8"))
data = await self.get_first_rank_players_from_sql(rank_type, query_type, desc)
await self.redis.set(key, data.json(by_alias=True), ex=self.expire)
return data
async def get_my_players_from_sql(
self, user_id: int, rank_type: "GachaLogTypeEnum", query_type: "GachaLogQueryTypeEnum", desc: bool
) -> RankDataModel:
players1 = await self.player_service.get_all_by_user_id(user_id)
ranks = await self.gacha_log_rank_service.get_ranks_by_ids(rank_type, [player.player_id for player in players1])
players = []
real_desc = self.get_desc_type(query_type)
if desc:
real_desc = not real_desc
for rank in ranks:
num = await self.gacha_log_rank_service.get_rank_by_player_id_cache(
rank_type, query_type, rank.player_id, desc=real_desc
)
if num is None:
continue
nickname = await self.get_nickname_from_uid(rank.player_id)
players.append(self.mysql_to_model(rank, num + 1, nickname))
players.sort(key=lambda x: x.num)
return RankDataModel(players=players, count=len(players))
async def get_my_players_from_cache(
self, user_id: int, rank_type: "GachaLogTypeEnum", query_type: "GachaLogQueryTypeEnum", desc: bool
) -> RankDataModel:
desc_int = 1 if desc else 0
key = f"{self.key}:{rank_type.value}:{query_type.value}:{desc_int}:{user_id}"
data = await self.redis.get(key)
if data:
return RankDataModel.parse_raw(str(data, encoding="utf-8"))
data = await self.get_my_players_from_sql(user_id, rank_type, query_type, desc)
await self.redis.set(key, data.json(by_alias=True), ex=self.expire2)
return data
@staticmethod
def get_data_key_map_by_type(rank_type: "GachaLogTypeEnum"):
data = {
"总抽数": "score_1",
"五星平均": "score_2",
}
if rank_type == GachaLogTypeEnum.CHARACTER:
data.update(
{
"UP平均": "score_3",
"小保底百分比": "score_4",
}
)
return data
def gen_button(self, user_id: int, desc: bool = False) -> List[List[InlineKeyboardButton]]:
types = [self.TYPES[i : i + 2] for i in range(0, len(self.TYPES), 2)]
if desc:
now_bind, new_bind, now_int, new_int = "非酋榜", "欧皇榜", 1, 0
else:
now_bind, new_bind, now_int, new_int = "欧皇榜", "非酋榜", 0, 1
data = [
[
InlineKeyboardButton(
idx[0],
callback_data=f"wish_log_rank|{user_id}|{idx[1].value}|{idx[2].value}|{now_int}",
)
for idx in id1
]
for id1 in types
]
page_button = [
InlineKeyboardButton(f"当前是{now_bind}", callback_data=f"wish_log_rank_button|{user_id}|ignore"),
InlineKeyboardButton(f"切换到{new_bind}", callback_data=f"wish_log_rank_button|{user_id}|{new_int}"),
]
data.append(page_button)
return data
@handler.command("signal_log_rank", block=False)
@handler.message(filters.Regex(r"^抽卡排行榜(.*)$"), block=False)
async def wish_log_rank(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE"):
user_id = await self.get_real_user_id(update)
message = update.effective_message
buttons = self.gen_button(user_id)
if isinstance(self.wish_photo, str):
photo = self.wish_photo
else:
photo = open("resources/img/wish.jpg", "rb")
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
reply_message = await message.reply_photo(
photo=photo,
caption="请选择你要查询的抽卡排行榜",
reply_markup=InlineKeyboardMarkup(buttons),
)
if reply_message.photo:
self.wish_photo = reply_message.photo[-1].file_id
@handler.callback_query(pattern=r"^wish_log_rank\|", block=False)
async def wish_log_rank_callback(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None:
callback_query = update.callback_query
user = callback_query.from_user
message = callback_query.message
async def get_wish_log_rank_callback(
callback_query_data: str,
) -> Tuple[int, GachaLogTypeEnum, GachaLogQueryTypeEnum, bool]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_rank_type = GachaLogTypeEnum(int(_data[2]))
_query_type = GachaLogQueryTypeEnum(_data[3])
_desc = bool(int(_data[4]))
logger.debug(
"callback_query_data函数返回 user_id[%s] rank_type[%s] query_type[%s] desc[%s]",
_user_id,
_rank_type,
_query_type,
_desc,
)
return _user_id, _rank_type, _query_type, _desc
try:
user_id, rank_type, query_type, desc = await get_wish_log_rank_callback(callback_query.data)
except IndexError:
await callback_query.answer("按钮数据已过期,请重新获取。", show_alert=True)
self.add_delete_message_job(message, delay=1)
return
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
await self.render(update, context, user_id, rank_type, query_type, desc)
async def get_render_data(
self,
user_id: int,
rank_type: "GachaLogTypeEnum",
query_type: "GachaLogQueryTypeEnum",
desc: bool,
) -> Dict:
my_data = await self.get_my_players_from_cache(user_id, rank_type, query_type, desc)
list_data = await self.get_first_rank_players_from_cache(rank_type, query_type, desc)
name_card = ""
return {
"data_list": [my_data, list_data],
"count": list_data.count,
"namecard": name_card,
"pool_name": GachaLogRanks.ITEM_LIST_MAP_REV.get(rank_type),
"data_key_map": self.get_data_key_map_by_type(rank_type),
"main_key": query_type,
"desc": desc,
}
async def render(
self,
update: "Update",
_: "ContextTypes.DEFAULT_TYPE",
user_id: int,
rank_type: "GachaLogTypeEnum",
query_type: "GachaLogQueryTypeEnum",
desc: bool = False,
):
callback_query = update.callback_query
message = callback_query.message
await message.reply_chat_action(ChatAction.TYPING)
render_data = await self.get_render_data(user_id, rank_type, query_type, desc)
try:
await callback_query.answer(text="正在渲染图片中 请稍等 请不要重复点击按钮", show_alert=False)
except BadRequest:
pass
png_data = await self.template_service.render(
"genshin/wish_log_rank/rank.jinja2",
render_data,
viewport={"width": 1040, "height": 500},
full_page=True,
query_selector=".container",
file_type=FileType.PHOTO,
ttl=1 * 60 * 60,
)
await png_data.edit_media(message)
@handler.callback_query(pattern=r"^wish_log_rank_button\|", block=False)
async def wish_log_rank_button_callback(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None:
callback_query = update.callback_query
user = callback_query.from_user
message = callback_query.message
async def get_wish_log_rank_button_callback(
callback_query_data: str,
) -> Tuple[int, bool, bool]:
_data = callback_query_data.split("|")
_user_id = int(_data[1])
_ignore = _data[2] == "ignore"
_desc = False if _ignore else bool(int(_data[2]))
logger.debug(
"callback_query_data函数返回 user_id[%s] ignore[%s] desc[%s]",
_user_id,
_ignore,
_desc,
)
return _user_id, _ignore, _desc
try:
user_id, ignore, desc = await get_wish_log_rank_button_callback(callback_query.data)
except IndexError:
await callback_query.answer("按钮数据已过期,请重新获取。", show_alert=True)
self.add_delete_message_job(message, delay=1)
return
if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n" + config.notice.user_mismatch, show_alert=True)
return
if ignore:
await callback_query.answer("无效按钮", show_alert=False)
return
buttons = self.gen_button(user_id, desc)
await message.edit_reply_markup(reply_markup=InlineKeyboardMarkup(buttons))
await callback_query.answer("已切换", show_alert=False)

View File

@ -0,0 +1,64 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Avatar List</title>
<link type="text/css" href="./style.css" rel="stylesheet"/>
<link type="text/css" href="../avatar_list/style.css" rel="stylesheet"/>
<link type="text/css" href="../../styles/public.css" rel="stylesheet"/>
</head>
<body>
<div class="container">
<div class="head" style="background-image: url('{{ namecard }}')">
<div class="player">
<div>
<div class="nickname">抽卡排行榜 - {{ pool_name }}</div>
{% if desc %}
{% set rank_name = "非酋榜" %}
{% else %}
{% set rank_name = "欧皇榜" %}
{% endif %}
<div class="uid">{{ rank_name }} - 共 {{ count }} 条数据</div>
</div>
</div>
<div class="logo"></div>
</div>
{% for my_data in data_list %}
<div class="content">
<div class="row">
<div>#</div>
<div style="flex: 3">UID</div>
<div style="flex: 3">昵称</div>
{% for key in data_key_map.keys() %}
<div style="flex: 3">{{ key }}</div>
{% endfor %}
</div>
{% for data in my_data.players %}
<div
{% if loop.index is even %}
class="row second-row"
{% else %}
class="row"
{% endif %}
>
<div>{{ data.num }}</div>
<div style="flex: 3">{{ data.mask_uid }}</div>
<div style="flex: 3" class="username">{{ data.nickname }}</div>
{% for value in data_key_map.values() %}
{% if value == main_key %}
{% set style = "background-color: rgb(229 171 229/70%);" %}
{% else %}
{% set style = "" %}
{% endif %}
<div style="flex: 3; {{ style }}">{{ data[value] }}</div>
{% endfor %}
</div>
{% endfor %}
</div>
{% if loop.index == 1 %}
<div style="height: 50px"></div>
{% endif %}
{% endfor %}
</div>
</body>
</html>

View File

@ -0,0 +1,8 @@
.username {
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
.player {
margin-left: 50px;
}