Support uigf v4.0

This commit is contained in:
xtaodada 2024-12-01 23:48:45 +08:00
parent a65688b799
commit 856d27d7e1
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
5 changed files with 116 additions and 67 deletions

View File

@ -1,6 +1,6 @@
from simnet.models.zzz.wish import ZZZBannerType
ZZZGF_VERSION = "v1.0"
UIGF_VERSION = "v4.0"
GACHA_TYPE_LIST = {

View File

@ -32,12 +32,10 @@ from modules.gacha_log.models import (
GachaLogInfo,
ImportType,
Pool,
ZZZGFInfo,
ZZZGFItem,
ZZZGFModel,
)
from modules.gacha_log.online_view import GachaLogOnlineView
from modules.gacha_log.ranks import GachaLogRanks
from modules.gacha_log.uigf import GachaLogUigfConverter
from utils.const import PROJECT_ROOT
from utils.uid import mask_number
@ -49,7 +47,7 @@ GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "signal_log")
GACHA_LOG_PATH.mkdir(parents=True, exist_ok=True)
class GachaLog(GachaLogOnlineView, GachaLogRanks):
class GachaLog(GachaLogOnlineView, GachaLogRanks, GachaLogUigfConverter):
def __init__(
self,
gacha_log_path: Path = GACHA_LOG_PATH,
@ -145,36 +143,6 @@ class GachaLog(GachaLogOnlineView, GachaLogRanks):
# 写入数据
await self.save_json(save_path, info.json())
async def gacha_log_to_zzzgf(self, user_id: str, uid: str) -> Optional[Path]:
"""调频日记转换为 ZZZGF 格式
:param user_id: 用户ID
:param uid: 游戏UID
:return: 转换是否成功转换信息ZZZGF 文件目录
"""
data, state = await self.load_history_info(user_id, uid)
if not state:
raise GachaLogNotFound
save_path = self.gacha_log_path / f"{user_id}-{uid}-zzzgf.json"
info = ZZZGFModel(
info=ZZZGFInfo(uid=uid, export_app=ImportType.PaiGram.value, export_app_version="v4"), list=[]
)
for items in data.item_list.values():
for item in items:
info.list.append(
ZZZGFItem(
id=item.id,
name=item.name,
gacha_id=item.gacha_id,
gacha_type=item.gacha_type,
item_id=item.item_id,
item_type=item.item_type,
rank_type=item.rank_type,
time=item.time.strftime("%Y-%m-%d %H:%M:%S"),
)
)
await self.save_json(save_path, json.loads(info.json()))
return save_path
@staticmethod
async def verify_data(data: List[GachaItem]) -> bool:
try:
@ -212,17 +180,20 @@ class GachaLog(GachaLogOnlineView, GachaLogRanks):
async def import_gacha_log_data(self, user_id: int, player_id: int, data: dict, verify_uid: bool = True) -> int:
new_num = 0
try:
uid = data["info"]["uid"]
if not verify_uid:
uid = player_id
elif int(uid) != player_id:
_data, uid = None, None
for _i in data.get("nap", []):
uid = _i.get("uid", "0")
if (not verify_uid) or int(uid) == player_id:
_data = _i
break
if not _data or not uid:
raise GachaLogAccountNotFound
try:
import_type = ImportType(data["info"]["export_app"])
except ValueError:
import_type = ImportType.UNKNOWN
# 检查导入数据是否合法
all_items = [GachaItem(**i) for i in data["list"]]
all_items = [GachaItem(**i) for i in _data["list"]]
await self.verify_data(all_items)
gacha_log, status = await self.load_history_info(str(user_id), uid)
# 将唯一 id 放入临时数据中,加快查找速度

View File

@ -1,13 +1,13 @@
import datetime
from enum import Enum
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Union, Optional
from pydantic import field_validator
from simnet.models.base import APIModel as BaseModel, DateTimeField, add_timezone
from metadata.shortname import not_real_roles, roleToId, weaponToId, buddyToId
from modules.gacha_log.const import ZZZGF_VERSION
from modules.gacha_log.const import UIGF_VERSION
class ImportType(Enum):
@ -148,27 +148,25 @@ class ZZZGFGachaType(Enum):
BANGBOO = "5"
class ZZZGFItem(BaseModel):
class UIGFItem(BaseModel):
id: str
name: str
count: str = "1"
gacha_id: str = ""
gacha_type: ZZZGFGachaType
item_id: str = ""
item_type: ItemType
rank_type: str
time: str
uigf_gacha_type: ZZZGFGachaType
gacha_id: Optional[str] = ""
class ZZZGFInfo(BaseModel):
uid: str = "0"
lang: str = "zh-cn"
region_time_zone: int = 8
class UIGFInfo(BaseModel):
export_time: str = ""
export_timestamp: int = 0
export_app: str = ""
export_app_version: str = ""
zzzgf_version: str = ZZZGF_VERSION
version: str = UIGF_VERSION
def __init__(self, **data: Any):
super().__init__(**data)
@ -177,6 +175,15 @@ class ZZZGFInfo(BaseModel):
self.export_timestamp = int(datetime.datetime.now().timestamp())
class ZZZGFModel(BaseModel):
info: ZZZGFInfo
list: List[ZZZGFItem]
class UIGFListInfo(BaseModel):
uid: int = 0
timezone: int = 8
lang: str = "zh-cn"
list: List[UIGFItem]
class UIGFModel(BaseModel):
info: UIGFInfo
hk4e: List[UIGFListInfo]
hkrpg: List[UIGFListInfo]
nap: List[UIGFListInfo]

63
modules/gacha_log/uigf.py Normal file
View File

@ -0,0 +1,63 @@
import ujson
from abc import abstractmethod
from pathlib import Path
from typing import Optional, Tuple, TYPE_CHECKING
from modules.gacha_log.error import GachaLogNotFound
from modules.gacha_log.models import UIGFModel, UIGFInfo, ImportType, UIGFListInfo, UIGFItem
if TYPE_CHECKING:
from modules.gacha_log.models import GachaLogInfo
class GachaLogUigfConverter:
"""抽卡记录导出为 uigf 标准"""
gacha_log_path: Path
@staticmethod
@abstractmethod
async def save_json(path, data):
"""保存json文件"""
@abstractmethod
async def load_history_info(
self, user_id: str, uid: str, only_status: bool = False
) -> Tuple[Optional["GachaLogInfo"], bool]:
"""读取历史抽卡记录数据
:param user_id: 用户id
:param uid: 原神uid
:param only_status: 是否只读取状态
:return: 抽卡记录数据
"""
async def gacha_log_to_uigf(self, user_id: str, uid: str) -> Optional[Path]:
"""抽卡记录转换为 UIGF 格式
:param user_id: 用户ID
:param uid: 游戏UID
:return: 转换是否成功转换信息UIGF文件目录
"""
data, state = await self.load_history_info(user_id, uid)
if not state:
raise GachaLogNotFound
save_path = self.gacha_log_path / f"{user_id}-{uid}-uigf.json"
i = UIGFInfo(export_app=ImportType.PaiGram.value, export_app_version="v4")
list_info = UIGFListInfo(uid=int(uid), list=[])
info = UIGFModel(info=i, hk4e=[], hkrpg=[], nap=[list_info])
for items in data.item_list.values():
for item in items:
list_info.list.append(
UIGFItem(
id=item.id,
name=item.name,
gacha_id=item.gacha_id,
gacha_type=item.gacha_type,
item_id=item.item_id,
item_type=item.item_type,
rank_type=item.rank_type,
time=item.time.strftime("%Y-%m-%d %H:%M:%S"),
uigf_gacha_type=item.gacha_type,
)
)
await self.save_json(save_path, ujson.loads(info.model_dump_json()))
return save_path

View File

@ -27,7 +27,7 @@ from gram_core.basemodel import RegionEnum
from gram_core.config import config
from gram_core.plugin.methods.inline_use_data import IInlineUseData
from gram_core.services.gacha_log_rank.services import GachaLogRankService
from modules.gacha_log.const import ZZZGF_VERSION, GACHA_TYPE_LIST_REVERSE
from modules.gacha_log.const import GACHA_TYPE_LIST_REVERSE, UIGF_VERSION
from modules.gacha_log.error import (
GachaLogAccountNotFound,
GachaLogAuthkeyTimeout,
@ -59,7 +59,7 @@ if TYPE_CHECKING:
from gram_core.services.template.models import RenderResult
INPUT_URL, INPUT_LAZY, CONFIRM_DELETE = range(10100, 10103)
WAITING = f"{config.notice.bot_name}正在从服务器获取数据,请稍"
WAITING = f"{config.notice.bot_name}正在从服务器获取数据,请稍"
WISHLOG_NOT_FOUND = f"{config.notice.bot_name}没有找到你的调频记录,快来私聊{config.notice.bot_name}导入吧~"
WISHLOG_WEB = """<b>调频记录详细信息查询</b>
@ -83,7 +83,7 @@ class WishLogPlugin(Plugin.Conversation):
IMPORT_HINT = (
"<b>开始导入祈愿历史记录:请通过 https://zzz.rng.moe/en/tracker/import 获取调频记录链接后发送给我"
"(非 zzz.rng.moe 导出的文件数据)</b>\n\n"
f"> 你还可以向{config.notice.bot_name}发送从其他工具导出的 ZZZGF {ZZZGF_VERSION} 标准的记录文件\n"
f"> 你还可以向{config.notice.bot_name}发送从其他工具导出的 UIGF {UIGF_VERSION} 标准的记录文件\n"
"> 在绑定 Cookie 时添加 stoken 可能有特殊效果哦(仅限国服)\n"
"<b>注意:导入的数据将会与旧数据进行合并。</b>"
)
@ -161,10 +161,12 @@ class WishLogPlugin(Plugin.Conversation):
if document.file_name.endswith(".json"):
file_type = "json"
else:
await message.reply_text("文件格式错误,请发送符合 ZZZGF 标准的调频记录文件")
await message.reply_text(
"文件格式错误,请发送符合 UIGF 标准的调频记录文件", reply_markup=ReplyKeyboardRemove()
)
return
if document.file_size > 5 * 1024 * 1024:
await message.reply_text("文件过大,请发送小于 5 MB 的文件")
await message.reply_text("文件过大,请发送小于 5 MB 的文件", reply_markup=ReplyKeyboardRemove())
return
try:
out = BytesIO()
@ -173,17 +175,24 @@ class WishLogPlugin(Plugin.Conversation):
# bytesio to json
data = jsonlib.loads(out.getvalue().decode("utf-8"))
else:
await message.reply_text("文件解析失败,请检查文件")
await message.reply_text("文件解析失败,请检查文件", reply_markup=ReplyKeyboardRemove())
return
except GachaLogFileError:
await message.reply_text("文件解析失败,请检查文件是否符合 ZZZGF 标准")
await message.reply_text(
f"文件解析失败,请检查文件是否符合 UIGF {UIGF_VERSION} 标准", reply_markup=ReplyKeyboardRemove()
)
return
except (KeyError, IndexError, ValueError):
await message.reply_text("文件解析失败,请检查文件编码是否正确或符合 ZZZGF 标准")
await message.reply_text(
f"文件解析失败,请检查文件编码是否正确或符合 UIGF {UIGF_VERSION} 标准",
reply_markup=ReplyKeyboardRemove(),
)
return
except Exception as exc:
logger.error("文件解析失败 %s", repr(exc))
await message.reply_text("文件解析失败,请检查文件是否符合 ZZZGF 标准")
await message.reply_text(
f"文件解析失败,请检查文件是否符合 UIGF {UIGF_VERSION} 标准", reply_markup=ReplyKeyboardRemove()
)
return
await message.reply_chat_action(ChatAction.TYPING)
reply = await message.reply_text("文件解析成功,正在导入数据", reply_markup=ReplyKeyboardRemove())
@ -192,8 +201,9 @@ class WishLogPlugin(Plugin.Conversation):
text = await self._refresh_user_data(user, player_id, data=data, verify_uid=file_type == "json")
except Exception as exc: # pylint: disable=W0703
logger.error("文件解析失败 %s", repr(exc))
text = "文件解析失败,请检查文件是否符合 ZZZGF 标准"
await reply.edit_text(text)
text = f"文件解析失败,请检查文件是否符合 UIGF {UIGF_VERSION} 标准"
self.add_delete_message_job(reply, delay=1)
await message.reply_text(text, reply_markup=ReplyKeyboardRemove())
async def can_gen_authkey(self, user_id: int, player_id: int) -> bool:
player_info = await self.players_service.get_player(user_id, region=RegionEnum.HYPERION, player_id=player_id)
@ -375,11 +385,9 @@ class WishLogPlugin(Plugin.Conversation):
try:
await message.reply_chat_action(ChatAction.TYPING)
player_id = await self.get_player_id(user.id, uid, offset)
path = await self.gacha_log.gacha_log_to_zzzgf(str(user.id), str(player_id))
path = await self.gacha_log.gacha_log_to_uigf(str(user.id), str(player_id))
await message.reply_chat_action(ChatAction.UPLOAD_DOCUMENT)
await message.reply_document(
document=open(path, "rb+"), caption=f"调频记录导出文件 - ZZZGF {ZZZGF_VERSION}"
)
await message.reply_document(document=open(path, "rb+"), caption=f"调频记录导出文件 - UIGF {UIGF_VERSION}")
except GachaLogNotFound:
logger.info("未找到用户 %s[%s] 的调频记录", user.full_name, user.id)
buttons = [