From e935d0408250901e8dc532a6116f47c861cdc38d Mon Sep 17 00:00:00 2001
From: omg-xtao <100690902+omg-xtao@users.noreply.github.com>
Date: Sat, 8 Oct 2022 23:40:15 +0800
Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20gacha=5Flog=20=E6=94=AF=E6=8C=81?=
=?UTF-8?q?=E5=88=A0=E9=99=A4=E6=95=B0=E6=8D=AE?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
metadata/shortname.py | 1 +
modules/apihelper/gacha_log.py | 95 +++++++++++++++++++++++++++---
plugins/genshin/gacha/gacha_log.py | 79 ++++++++++++++++++++++---
plugins/system/get_chat.py | 17 +++---
4 files changed, 169 insertions(+), 23 deletions(-)
diff --git a/metadata/shortname.py b/metadata/shortname.py
index 6dd62105..d0faf42e 100644
--- a/metadata/shortname.py
+++ b/metadata/shortname.py
@@ -100,6 +100,7 @@ roles = {
10000073: ['纳西妲', 'Nahida', 'nahida', '草王', '草神', '小吉祥草王', '草萝莉', '纳西坦'],
10000074: ['莱依拉', 'Layla', 'layla', '拉一拉'],
}
+not_real_roles = ["纳西妲", "莱依拉"]
weapons = {
"磐岩结绿": ["绿箭", "绿剑"],
"斫峰之刃": ["斫峰", "盾剑"],
diff --git a/modules/apihelper/gacha_log.py b/modules/apihelper/gacha_log.py
index ae63f4e4..cd68a7f0 100644
--- a/modules/apihelper/gacha_log.py
+++ b/modules/apihelper/gacha_log.py
@@ -8,11 +8,11 @@ from typing import List, Dict, Tuple, Optional, Union
import aiofiles
from genshin import Client, InvalidAuthkey
from genshin.models import BannerType
-from pydantic import BaseModel
+from pydantic import BaseModel, validator
from core.base.assets import AssetsService
from metadata.pool.pool import get_pool_by_id
-from metadata.shortname import roleToId, weaponToId
+from metadata.shortname import roleToId, weaponToId, not_real_roles
from utils.const import PROJECT_ROOT
GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "gacha_log")
@@ -52,6 +52,30 @@ class GachaItem(BaseModel):
rank_type: str
time: datetime.datetime
+ @validator('name')
+ def name_validator(cls, v):
+ if (roleToId(v) or weaponToId(v)) and not not_real_roles(v):
+ return v
+ raise ValueError('Invalid name')
+
+ @validator('gacha_type')
+ def check_gacha_type(cls, v):
+ if v not in {"200", "301", "302", "400"}:
+ raise ValueError("gacha_type must be 200, 301, 302 or 400")
+ return v
+
+ @validator('item_type')
+ def check_item_type(cls, item):
+ if item not in {'角色', '武器'}:
+ raise ValueError('error item type')
+ return item
+
+ @validator('rank_type')
+ def check_rank_type(cls, rank):
+ if rank not in {'5', '4', '3'}:
+ raise ValueError('error rank type')
+ return rank
+
class GachaLogInfo(BaseModel):
user_id: str
@@ -119,19 +143,48 @@ class GachaLog:
await f.write(data)
@staticmethod
- async def load_history_info(user_id: str, uid: str) -> Tuple[GachaLogInfo, bool]:
+ async def load_history_info(user_id: str, uid: str, only_status: bool = False) -> Tuple[Optional[GachaLogInfo], bool]:
"""读取历史抽卡记录数据
:param user_id: 用户id
:param uid: 原神uid
+ :param only_status: 是否只读取状态
:return: 抽卡记录数据
"""
file_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json'
- if file_path.exists():
+ if only_status:
+ return None, file_path.exists()
+ if not file_path.exists():
+ return GachaLogInfo(
+ user_id=user_id,
+ uid=uid,
+ update_time=datetime.datetime.now()
+ ), False
+ try:
return GachaLogInfo.parse_obj(await GachaLog.load_json(file_path)), True
- else:
- return GachaLogInfo(user_id=user_id,
- uid=uid,
- update_time=datetime.datetime.now()), False
+ except json.decoder.JSONDecodeError:
+ return GachaLogInfo(user_id=user_id, uid=uid, update_time=datetime.datetime.now()), False
+
+ @staticmethod
+ async def remove_history_info(user_id: str, uid: str) -> bool:
+ """删除历史抽卡记录数据
+ :param user_id: 用户id
+ :param uid: 原神uid
+ :return: 是否删除成功
+ """
+ file_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json'
+ file_bak_path = GACHA_LOG_PATH / f'{user_id}-{uid}.json.bak'
+ file_export_path = GACHA_LOG_PATH / f'{user_id}-{uid}-uigf.json'
+ with contextlib.suppress(Exception):
+ file_bak_path.unlink(missing_ok=True)
+ with contextlib.suppress(Exception):
+ file_export_path.unlink(missing_ok=True)
+ if file_path.exists():
+ try:
+ file_path.unlink()
+ except PermissionError:
+ return False
+ return True
+ return False
@staticmethod
async def save_gacha_log_info(user_id: str, uid: str, info: GachaLogInfo):
@@ -160,7 +213,7 @@ class GachaLog:
"""
data, state = await GachaLog.load_history_info(user_id, uid)
if not state:
- return False, f'派蒙还没有找到你导入的任何抽卡记录哦,快试试导入吧~', None
+ return False, '派蒙还没有找到你导入的任何抽卡记录哦,快试试导入吧~', None
save_path = GACHA_LOG_PATH / f'{user_id}-{uid}-uigf.json'
uigf_dict = {
'info': {
@@ -190,11 +243,31 @@ class GachaLog:
await GachaLog.save_json(save_path, uigf_dict)
return True, '', save_path
+ @staticmethod
+ async def verify_data(data: List[GachaItem]):
+ try:
+ total = len(data)
+ five_star = len([i for i in data if i.rank_type == "5"])
+ four_star = len([i for i in data if i.rank_type == "4"])
+ if total > 50:
+ if total <= five_star * 15:
+ return False, "检测到您将要导入的抽卡记录中五星数量过多,可能是由于文件错误导致的,请检查后重新导入。"
+ if four_star < five_star:
+ return False, "检测到您将要导入的抽卡记录中五星数量过多,可能是由于文件错误导致的,请检查后重新导入。"
+ return True, ""
+ except Exception:
+ return False, "导入失败,数据格式错误"
+
@staticmethod
async def import_gacha_log_data(user_id: int, data: dict):
new_num = 0
try:
+ # 检查导入数据是否合法
+ status, text = await GachaLog.verify_data([GachaItem(**i) for i in data['list']])
+ if not status:
+ return text
uid = data['info']['uid']
+ int(uid)
gacha_log, _ = await GachaLog.load_history_info(str(user_id), uid)
for item in data['list']:
pool_name = GACHA_TYPE_LIST[BannerType(int(item['gacha_type']))]
@@ -203,6 +276,10 @@ class GachaLog:
gacha_log.item_list[pool_name].append(item_info)
new_num += 1
for i in gacha_log.item_list.values():
+ # 检查导入后的数据是否合法
+ status, text = await GachaLog.verify_data(i)
+ if not status:
+ return text
i.sort(key=lambda x: (x.time, x.id))
gacha_log.update_time = datetime.datetime.now()
await GachaLog.save_gacha_log_info(str(user_id), uid, gacha_log)
diff --git a/plugins/genshin/gacha/gacha_log.py b/plugins/genshin/gacha/gacha_log.py
index c415452d..47cc4142 100644
--- a/plugins/genshin/gacha/gacha_log.py
+++ b/plugins/genshin/gacha/gacha_log.py
@@ -15,12 +15,13 @@ from core.user import UserService
from core.user.error import UserNotFoundError
from modules.apihelper.gacha_log import GachaLog as GachaLogService
from utils.bot import get_all_args
+from utils.decorators.admins import bot_admins_rights_check
from utils.decorators.error import error_callable
from utils.decorators.restricts import restricts
from utils.helpers import get_genshin_client
from utils.log import logger
-INPUT_URL, INPUT_FILE = 10100, 10101
+INPUT_URL, INPUT_FILE, CONFIRM_DELETE = range(10100, 10103)
class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
@@ -67,8 +68,8 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
document = message.document
if not document.file_name.endswith(".json"):
await message.reply_text("文件格式错误,请发送符合 UIGF 标准的抽卡记录文件")
- if document.file_size > 0.2 * 1024 * 1024:
- await message.reply_text("文件过大,请发送小于 256kb 的文件")
+ if document.file_size > 1 * 1024 * 1024:
+ await message.reply_text("文件过大,请发送小于 1 MB 的文件")
try:
data = BytesIO()
await (await document.get_file()).download(out=data)
@@ -105,10 +106,14 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
elif message.reply_to_message and message.reply_to_message.document:
await self.import_from_file(user, message, document=message.reply_to_message.document)
return ConversationHandler.END
- await message.reply_text("导入祈愿历史记录\n\n"
- "请直接向派蒙发送从游戏中获取到的抽卡记录链接\n\n"
- "获取抽卡记录链接可以参考:https://paimon.moe/wish/import",
- parse_mode="html")
+ await message.reply_text(
+ "导入祈愿历史记录\n\n"
+ "1.请发送从其他工具导出的 UIGF JSON 标准的记录文件\n"
+ "2.你还可以向派蒙发送从游戏中获取到的抽卡记录链接\n\n"
+ "注意:导入的数据将会与旧数据进行合并。\n"
+ "获取抽卡记录链接可以参考:https://paimon.moe/wish/import",
+ parse_mode="html"
+ )
return INPUT_URL
authkey = self.from_url_get_authkey(args[0])
data = await self._refresh_user_data(user, authkey=authkey)
@@ -130,6 +135,66 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
await reply.edit_text(text)
return ConversationHandler.END
+ @conversation.entry_point
+ @handler(CommandHandler, command="gacha_log_delete", filters=filters.ChatType.PRIVATE, block=False)
+ @handler(MessageHandler, filters=filters.Regex("^删除抽卡记录(.*)") & filters.ChatType.PRIVATE, block=False)
+ @restricts()
+ @error_callable
+ async def command_start_delete(self, update: Update, context: CallbackContext) -> int:
+ message = update.effective_message
+ user = update.effective_user
+ logger.info(f"用户 {user.full_name}[{user.id}] 删除抽卡记录命令请求")
+ try:
+ client = await get_genshin_client(user.id, need_cookie=False)
+ context.chat_data["uid"] = client.uid
+ except UserNotFoundError:
+ await message.reply_text("你还没有导入抽卡记录哦~")
+ return ConversationHandler.END
+ _, status = await GachaLogService.load_history_info(str(user.id), str(client.uid), only_status=True)
+ if not status:
+ await message.reply_text("你还没有导入抽卡记录哦~")
+ return ConversationHandler.END
+ await message.reply_text("你确定要删除抽卡记录吗?(此项操作无法恢复),如果确定请发送 ”确定“,发送其他内容取消")
+ return CONFIRM_DELETE
+
+ @conversation.state(state=CONFIRM_DELETE)
+ @handler.message(filters=filters.TEXT & ~filters.COMMAND, block=False)
+ @restricts()
+ @error_callable
+ async def command_confirm_delete(self, update: Update, context: CallbackContext) -> int:
+ message = update.effective_message
+ user = update.effective_user
+ if message.text == "确定":
+ status = await GachaLogService.remove_history_info(str(user.id), str(context.chat_data["uid"]))
+ await message.reply_text("抽卡记录已删除" if status else "抽卡记录删除失败")
+ return ConversationHandler.END
+ await message.reply_text("已取消")
+ return ConversationHandler.END
+
+ @handler(CommandHandler, command="gacha_log_force_delete", block=False)
+ @bot_admins_rights_check
+ async def command_gacha_log_force_delete(self, update: Update, context: CallbackContext):
+ message = update.effective_message
+ args = get_all_args(context)
+ if not args:
+ await message.reply_text("请指定用户ID")
+ return
+ try:
+ cid = int(args[0])
+ if cid < 0:
+ raise ValueError("Invalid cid")
+ client = await get_genshin_client(cid, need_cookie=False)
+ _, status = await GachaLogService.load_history_info(str(cid), str(client.uid), only_status=True)
+ if not status:
+ await message.reply_text("该用户还没有导入抽卡记录")
+ return
+ status = await GachaLogService.remove_history_info(str(cid), str(client.uid))
+ await message.reply_text("抽卡记录已强制删除" if status else "抽卡记录删除失败")
+ except UserNotFoundError:
+ await message.reply_text("该用户暂未绑定账号")
+ except (ValueError, IndexError):
+ await message.reply_text("用户ID 不合法")
+
@handler(CommandHandler, command="gacha_log_export", filters=filters.ChatType.PRIVATE, block=False)
@handler(MessageHandler, filters=filters.Regex("^导出抽卡记录(.*)") & filters.ChatType.PRIVATE, block=False)
@restricts()
diff --git a/plugins/system/get_chat.py b/plugins/system/get_chat.py
index 3cfe57f4..0bffbd47 100644
--- a/plugins/system/get_chat.py
+++ b/plugins/system/get_chat.py
@@ -1,3 +1,4 @@
+import contextlib
from typing import List
from telegram import Update, Chat, ChatMember, ChatMemberOwner, ChatMemberAdministrator
@@ -67,13 +68,15 @@ class GetChat(Plugin):
uid = user_info.genshin_uid or user_info.yuanshen_uid
text += f"{temp}
\n" \
f"游戏 ID:{uid}
"
- gacha_log, status = await GachaLog.load_history_info(str(chat.id), str(uid))
- if status:
- text += f"\n抽卡记录:"
- for key, value in gacha_log.item_list.items():
- text += f"\n - {key}:{len(value)} 条"
- else:
- text += f"\n抽卡记录:未导入
"
+ with contextlib.suppress(Exception):
+ gacha_log, status = await GachaLog.load_history_info(str(chat.id), str(uid))
+ if status:
+ text += f"\n抽卡记录:"
+ for key, value in gacha_log.item_list.items():
+ text += f"\n - {key}:{len(value)} 条"
+ text += f"\n - 最后更新:{gacha_log.update_time.strftime('%Y-%m-%d %H:%M:%S')}"
+ else:
+ text += f"\n抽卡记录:未导入
"
return text
@handler(CommandHandler, command="get_chat", block=False)