gacha_log 支持删除数据

This commit is contained in:
omg-xtao 2022-10-08 23:40:15 +08:00 committed by GitHub
parent 638fe509b8
commit e935d04082
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 169 additions and 23 deletions

View File

@ -100,6 +100,7 @@ roles = {
10000073: ['纳西妲', 'Nahida', 'nahida', '草王', '草神', '小吉祥草王', '草萝莉', '纳西坦'], 10000073: ['纳西妲', 'Nahida', 'nahida', '草王', '草神', '小吉祥草王', '草萝莉', '纳西坦'],
10000074: ['莱依拉', 'Layla', 'layla', '拉一拉'], 10000074: ['莱依拉', 'Layla', 'layla', '拉一拉'],
} }
not_real_roles = ["纳西妲", "莱依拉"]
weapons = { weapons = {
"磐岩结绿": ["绿箭", "绿剑"], "磐岩结绿": ["绿箭", "绿剑"],
"斫峰之刃": ["斫峰", "盾剑"], "斫峰之刃": ["斫峰", "盾剑"],

View File

@ -8,11 +8,11 @@ from typing import List, Dict, Tuple, Optional, Union
import aiofiles import aiofiles
from genshin import Client, InvalidAuthkey from genshin import Client, InvalidAuthkey
from genshin.models import BannerType from genshin.models import BannerType
from pydantic import BaseModel from pydantic import BaseModel, validator
from core.base.assets import AssetsService from core.base.assets import AssetsService
from metadata.pool.pool import get_pool_by_id from metadata.pool.pool import get_pool_by_id
from metadata.shortname import roleToId, weaponToId from metadata.shortname import roleToId, weaponToId, not_real_roles
from utils.const import PROJECT_ROOT from utils.const import PROJECT_ROOT
GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "gacha_log") GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "gacha_log")
@ -52,6 +52,30 @@ class GachaItem(BaseModel):
rank_type: str rank_type: str
time: datetime.datetime time: datetime.datetime
@validator('name')
def name_validator(cls, v):
if (roleToId(v) or weaponToId(v)) and not not_real_roles(v):
return v
raise ValueError('Invalid name')
@validator('gacha_type')
def check_gacha_type(cls, v):
if v not in {"200", "301", "302", "400"}:
raise ValueError("gacha_type must be 200, 301, 302 or 400")
return v
@validator('item_type')
def check_item_type(cls, item):
if item not in {'角色', '武器'}:
raise ValueError('error item type')
return item
@validator('rank_type')
def check_rank_type(cls, rank):
if rank not in {'5', '4', '3'}:
raise ValueError('error rank type')
return rank
class GachaLogInfo(BaseModel): class GachaLogInfo(BaseModel):
user_id: str user_id: str
@ -119,19 +143,48 @@ class GachaLog:
await f.write(data) await f.write(data)
@staticmethod @staticmethod
async def load_history_info(user_id: str, uid: str) -> Tuple[GachaLogInfo, bool]: async def load_history_info(user_id: str, uid: str, only_status: bool = False) -> Tuple[Optional[GachaLogInfo], bool]:
"""读取历史抽卡记录数据 """读取历史抽卡记录数据
:param user_id: 用户id :param user_id: 用户id
:param uid: 原神uid :param uid: 原神uid
:param only_status: 是否只读取状态
:return: 抽卡记录数据 :return: 抽卡记录数据
""" """
file_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json' file_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json'
if file_path.exists(): if only_status:
return GachaLogInfo.parse_obj(await GachaLog.load_json(file_path)), True return None, file_path.exists()
else: if not file_path.exists():
return GachaLogInfo(user_id=user_id, return GachaLogInfo(
user_id=user_id,
uid=uid, uid=uid,
update_time=datetime.datetime.now()), False update_time=datetime.datetime.now()
), False
try:
return GachaLogInfo.parse_obj(await GachaLog.load_json(file_path)), True
except json.decoder.JSONDecodeError:
return GachaLogInfo(user_id=user_id, uid=uid, update_time=datetime.datetime.now()), False
@staticmethod
async def remove_history_info(user_id: str, uid: str) -> bool:
"""删除历史抽卡记录数据
:param user_id: 用户id
:param uid: 原神uid
:return: 是否删除成功
"""
file_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json'
file_bak_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json.bak'
file_export_path = GACHA_LOG_PATH / f'{user_id}-{uid}-uigf.json'
with contextlib.suppress(Exception):
file_bak_path.unlink(missing_ok=True)
with contextlib.suppress(Exception):
file_export_path.unlink(missing_ok=True)
if file_path.exists():
try:
file_path.unlink()
except PermissionError:
return False
return True
return False
@staticmethod @staticmethod
async def save_gacha_log_info(user_id: str, uid: str, info: GachaLogInfo): async def save_gacha_log_info(user_id: str, uid: str, info: GachaLogInfo):
@ -160,7 +213,7 @@ class GachaLog:
""" """
data, state = await GachaLog.load_history_info(user_id, uid) data, state = await GachaLog.load_history_info(user_id, uid)
if not state: if not state:
return False, f'派蒙还没有找到你导入的任何抽卡记录哦,快试试导入吧~', None return False, '派蒙还没有找到你导入的任何抽卡记录哦,快试试导入吧~', None
save_path = GACHA_LOG_PATH / f'{user_id}-{uid}-uigf.json' save_path = GACHA_LOG_PATH / f'{user_id}-{uid}-uigf.json'
uigf_dict = { uigf_dict = {
'info': { 'info': {
@ -190,11 +243,31 @@ class GachaLog:
await GachaLog.save_json(save_path, uigf_dict) await GachaLog.save_json(save_path, uigf_dict)
return True, '', save_path return True, '', save_path
@staticmethod
async def verify_data(data: List[GachaItem]):
try:
total = len(data)
five_star = len([i for i in data if i.rank_type == "5"])
four_star = len([i for i in data if i.rank_type == "4"])
if total > 50:
if total <= five_star * 15:
return False, "检测到您将要导入的抽卡记录中五星数量过多,可能是由于文件错误导致的,请检查后重新导入。"
if four_star < five_star:
return False, "检测到您将要导入的抽卡记录中五星数量过多,可能是由于文件错误导致的,请检查后重新导入。"
return True, ""
except Exception:
return False, "导入失败,数据格式错误"
@staticmethod @staticmethod
async def import_gacha_log_data(user_id: int, data: dict): async def import_gacha_log_data(user_id: int, data: dict):
new_num = 0 new_num = 0
try: try:
# 检查导入数据是否合法
status, text = await GachaLog.verify_data([GachaItem(**i) for i in data['list']])
if not status:
return text
uid = data['info']['uid'] uid = data['info']['uid']
int(uid)
gacha_log, _ = await GachaLog.load_history_info(str(user_id), uid) gacha_log, _ = await GachaLog.load_history_info(str(user_id), uid)
for item in data['list']: for item in data['list']:
pool_name = GACHA_TYPE_LIST[BannerType(int(item['gacha_type']))] pool_name = GACHA_TYPE_LIST[BannerType(int(item['gacha_type']))]
@ -203,6 +276,10 @@ class GachaLog:
gacha_log.item_list[pool_name].append(item_info) gacha_log.item_list[pool_name].append(item_info)
new_num += 1 new_num += 1
for i in gacha_log.item_list.values(): for i in gacha_log.item_list.values():
# 检查导入后的数据是否合法
status, text = await GachaLog.verify_data(i)
if not status:
return text
i.sort(key=lambda x: (x.time, x.id)) i.sort(key=lambda x: (x.time, x.id))
gacha_log.update_time = datetime.datetime.now() gacha_log.update_time = datetime.datetime.now()
await GachaLog.save_gacha_log_info(str(user_id), uid, gacha_log) await GachaLog.save_gacha_log_info(str(user_id), uid, gacha_log)

View File

@ -15,12 +15,13 @@ from core.user import UserService
from core.user.error import UserNotFoundError from core.user.error import UserNotFoundError
from modules.apihelper.gacha_log import GachaLog as GachaLogService from modules.apihelper.gacha_log import GachaLog as GachaLogService
from utils.bot import get_all_args from utils.bot import get_all_args
from utils.decorators.admins import bot_admins_rights_check
from utils.decorators.error import error_callable from utils.decorators.error import error_callable
from utils.decorators.restricts import restricts from utils.decorators.restricts import restricts
from utils.helpers import get_genshin_client from utils.helpers import get_genshin_client
from utils.log import logger from utils.log import logger
INPUT_URL, INPUT_FILE = 10100, 10101 INPUT_URL, INPUT_FILE, CONFIRM_DELETE = range(10100, 10103)
class GachaLog(Plugin.Conversation, BasePlugin.Conversation): class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
@ -67,8 +68,8 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
document = message.document document = message.document
if not document.file_name.endswith(".json"): if not document.file_name.endswith(".json"):
await message.reply_text("文件格式错误,请发送符合 UIGF 标准的抽卡记录文件") await message.reply_text("文件格式错误,请发送符合 UIGF 标准的抽卡记录文件")
if document.file_size > 0.2 * 1024 * 1024: if document.file_size > 1 * 1024 * 1024:
await message.reply_text("文件过大,请发送小于 256kb 的文件") await message.reply_text("文件过大,请发送小于 1 MB 的文件")
try: try:
data = BytesIO() data = BytesIO()
await (await document.get_file()).download(out=data) await (await document.get_file()).download(out=data)
@ -105,10 +106,14 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
elif message.reply_to_message and message.reply_to_message.document: elif message.reply_to_message and message.reply_to_message.document:
await self.import_from_file(user, message, document=message.reply_to_message.document) await self.import_from_file(user, message, document=message.reply_to_message.document)
return ConversationHandler.END return ConversationHandler.END
await message.reply_text("<b>导入祈愿历史记录</b>\n\n" await message.reply_text(
"请直接向派蒙发送从游戏中获取到的抽卡记录链接\n\n" "<b>导入祈愿历史记录</b>\n\n"
"1.请发送从其他工具导出的 UIGF JSON 标准的记录文件\n"
"2.你还可以向派蒙发送从游戏中获取到的抽卡记录链接\n\n"
"<b>注意:导入的数据将会与旧数据进行合并。</b>\n"
"获取抽卡记录链接可以参考https://paimon.moe/wish/import", "获取抽卡记录链接可以参考https://paimon.moe/wish/import",
parse_mode="html") parse_mode="html"
)
return INPUT_URL return INPUT_URL
authkey = self.from_url_get_authkey(args[0]) authkey = self.from_url_get_authkey(args[0])
data = await self._refresh_user_data(user, authkey=authkey) data = await self._refresh_user_data(user, authkey=authkey)
@ -130,6 +135,66 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
await reply.edit_text(text) await reply.edit_text(text)
return ConversationHandler.END return ConversationHandler.END
@conversation.entry_point
@handler(CommandHandler, command="gacha_log_delete", filters=filters.ChatType.PRIVATE, block=False)
@handler(MessageHandler, filters=filters.Regex("^删除抽卡记录(.*)") & filters.ChatType.PRIVATE, block=False)
@restricts()
@error_callable
async def command_start_delete(self, update: Update, context: CallbackContext) -> int:
message = update.effective_message
user = update.effective_user
logger.info(f"用户 {user.full_name}[{user.id}] 删除抽卡记录命令请求")
try:
client = await get_genshin_client(user.id, need_cookie=False)
context.chat_data["uid"] = client.uid
except UserNotFoundError:
await message.reply_text("你还没有导入抽卡记录哦~")
return ConversationHandler.END
_, status = await GachaLogService.load_history_info(str(user.id), str(client.uid), only_status=True)
if not status:
await message.reply_text("你还没有导入抽卡记录哦~")
return ConversationHandler.END
await message.reply_text("你确定要删除抽卡记录吗?(此项操作无法恢复),如果确定请发送 ”确定“,发送其他内容取消")
return CONFIRM_DELETE
@conversation.state(state=CONFIRM_DELETE)
@handler.message(filters=filters.TEXT & ~filters.COMMAND, block=False)
@restricts()
@error_callable
async def command_confirm_delete(self, update: Update, context: CallbackContext) -> int:
message = update.effective_message
user = update.effective_user
if message.text == "确定":
status = await GachaLogService.remove_history_info(str(user.id), str(context.chat_data["uid"]))
await message.reply_text("抽卡记录已删除" if status else "抽卡记录删除失败")
return ConversationHandler.END
await message.reply_text("已取消")
return ConversationHandler.END
@handler(CommandHandler, command="gacha_log_force_delete", block=False)
@bot_admins_rights_check
async def command_gacha_log_force_delete(self, update: Update, context: CallbackContext):
message = update.effective_message
args = get_all_args(context)
if not args:
await message.reply_text("请指定用户ID")
return
try:
cid = int(args[0])
if cid < 0:
raise ValueError("Invalid cid")
client = await get_genshin_client(cid, need_cookie=False)
_, status = await GachaLogService.load_history_info(str(cid), str(client.uid), only_status=True)
if not status:
await message.reply_text("该用户还没有导入抽卡记录")
return
status = await GachaLogService.remove_history_info(str(cid), str(client.uid))
await message.reply_text("抽卡记录已强制删除" if status else "抽卡记录删除失败")
except UserNotFoundError:
await message.reply_text("该用户暂未绑定账号")
except (ValueError, IndexError):
await message.reply_text("用户ID 不合法")
@handler(CommandHandler, command="gacha_log_export", filters=filters.ChatType.PRIVATE, block=False) @handler(CommandHandler, command="gacha_log_export", filters=filters.ChatType.PRIVATE, block=False)
@handler(MessageHandler, filters=filters.Regex("^导出抽卡记录(.*)") & filters.ChatType.PRIVATE, block=False) @handler(MessageHandler, filters=filters.Regex("^导出抽卡记录(.*)") & filters.ChatType.PRIVATE, block=False)
@restricts() @restricts()

View File

@ -1,3 +1,4 @@
import contextlib
from typing import List from typing import List
from telegram import Update, Chat, ChatMember, ChatMemberOwner, ChatMemberAdministrator from telegram import Update, Chat, ChatMember, ChatMemberOwner, ChatMemberAdministrator
@ -67,11 +68,13 @@ class GetChat(Plugin):
uid = user_info.genshin_uid or user_info.yuanshen_uid uid = user_info.genshin_uid or user_info.yuanshen_uid
text += f"<code>{temp}</code>\n" \ text += f"<code>{temp}</code>\n" \
f"游戏 ID<code>{uid}</code>" f"游戏 ID<code>{uid}</code>"
with contextlib.suppress(Exception):
gacha_log, status = await GachaLog.load_history_info(str(chat.id), str(uid)) gacha_log, status = await GachaLog.load_history_info(str(chat.id), str(uid))
if status: if status:
text += f"\n抽卡记录:" text += f"\n抽卡记录:"
for key, value in gacha_log.item_list.items(): for key, value in gacha_log.item_list.items():
text += f"\n - {key}{len(value)}" text += f"\n - {key}{len(value)}"
text += f"\n - 最后更新:{gacha_log.update_time.strftime('%Y-%m-%d %H:%M:%S')}"
else: else:
text += f"\n抽卡记录:<code>未导入</code>" text += f"\n抽卡记录:<code>未导入</code>"
return text return text