From e2d5b7ee70ded6ea7a313167545e44b7d7c54eaa Mon Sep 17 00:00:00 2001 From: xtaodada Date: Mon, 2 Dec 2024 00:24:40 +0800 Subject: [PATCH] :sparkles: Support uigf v4.0 --- modules/gacha_log/const.py | 2 +- modules/gacha_log/log.py | 50 +++++++++-------------------- modules/gacha_log/models.py | 26 +++++++-------- modules/gacha_log/uigf.py | 63 +++++++++++++++++++++++++++++++++++++ plugins/genshin/wish_log.py | 30 ++++++++++++------ 5 files changed, 110 insertions(+), 61 deletions(-) create mode 100644 modules/gacha_log/uigf.py diff --git a/modules/gacha_log/const.py b/modules/gacha_log/const.py index 4635d146..ef63f5c6 100644 --- a/modules/gacha_log/const.py +++ b/modules/gacha_log/const.py @@ -1,7 +1,7 @@ from simnet.models.genshin.wish import BannerType PAIMONMOE_VERSION = 3 -UIGF_VERSION = "v3.0" +UIGF_VERSION = "v4.0" GACHA_TYPE_LIST = { diff --git a/modules/gacha_log/log.py b/modules/gacha_log/log.py index d4ed4eb9..a6177e81 100644 --- a/modules/gacha_log/log.py +++ b/modules/gacha_log/log.py @@ -41,9 +41,11 @@ from modules.gacha_log.models import ( UIGFInfo, UIGFItem, UIGFModel, + UIGFListInfo, ) from modules.gacha_log.online_view import GachaLogOnlineView from modules.gacha_log.ranks import GachaLogRanks +from modules.gacha_log.uigf import GachaLogUigfConverter from utils.const import PROJECT_ROOT from utils.uid import mask_number @@ -55,7 +57,7 @@ GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "gacha_log") GACHA_LOG_PATH.mkdir(parents=True, exist_ok=True) -class GachaLog(GachaLogOnlineView, GachaLogRanks): +class GachaLog(GachaLogOnlineView, GachaLogRanks, GachaLogUigfConverter): def __init__( self, gacha_log_path: Path = GACHA_LOG_PATH, @@ -151,34 +153,6 @@ class GachaLog(GachaLogOnlineView, GachaLogRanks): # 写入数据 await self.save_json(save_path, info.json()) - async def gacha_log_to_uigf(self, user_id: str, uid: str) -> Optional[Path]: - """抽卡日记转换为 UIGF 格式 - :param user_id: 用户ID - :param uid: 游戏UID - :return: 转换是否成功、转换信息、UIGF文件目录 - """ - data, state = await self.load_history_info(user_id, uid) - if not state: - raise GachaLogNotFound - save_path = self.gacha_log_path / f"{user_id}-{uid}-uigf.json" - info = UIGFModel(info=UIGFInfo(uid=uid, export_app=ImportType.PaiGram.value, export_app_version="v3"), list=[]) - for items in data.item_list.values(): - for item in items: - info.list.append( - UIGFItem( - id=item.id, - name=item.name, - gacha_type=item.gacha_type, - item_id=roleToId(item.name) if item.item_type == "角色" else weaponToId(item.name), - item_type=item.item_type, - rank_type=item.rank_type, - time=item.time.strftime("%Y-%m-%d %H:%M:%S"), - uigf_gacha_type=item.gacha_type if item.gacha_type != "400" else "301", - ) - ) - await self.save_json(save_path, json.loads(info.json())) - return save_path - @staticmethod async def verify_data(data: List[GachaItem]) -> bool: try: @@ -216,17 +190,20 @@ class GachaLog(GachaLogOnlineView, GachaLogRanks): async def import_gacha_log_data(self, user_id: int, player_id: int, data: dict, verify_uid: bool = True) -> int: new_num = 0 try: - uid = data["info"]["uid"] - if not verify_uid: - uid = player_id - elif int(uid) != player_id: + _data, uid = None, None + for _i in data.get("hk4e", []): + uid = _i.get("uid", "0") + if (not verify_uid) or int(uid) == player_id: + _data = _i + break + if not _data or not uid: raise GachaLogAccountNotFound try: import_type = ImportType(data["info"]["export_app"]) except ValueError: import_type = ImportType.UNKNOWN # 检查导入数据是否合法 - all_items = [GachaItem(**i) for i in data["list"]] + all_items = [GachaItem(**i) for i in _data["list"]] await self.verify_data(all_items) gacha_log, status = await self.load_history_info(str(user_id), uid) if import_type == ImportType.PAIMONMOE: @@ -808,7 +785,8 @@ class GachaLog(GachaLogOnlineView, GachaLogRanks): UIGFGachaType.CHARACTER: "角色活动祈愿", UIGFGachaType.WEAPON: "武器活动祈愿", } - data = UIGFModel(info=UIGFInfo(export_app=import_type.value), list=[]) + data = UIGFListInfo(list=[]) + info = UIGFModel(info=UIGFInfo(export_app=import_type.value), hk4e=[data], hkrpg=[], nap=[]) if import_type == ImportType.PAIMONMOE: ws = wb["Information"] if ws["B2"].value != PAIMONMOE_VERSION: @@ -852,4 +830,4 @@ class GachaLog(GachaLogOnlineView, GachaLogRanks): break data.list.append(from_fxq(gacha_type, row[2], row[1], row[0], row[3], row[6])) - return json.loads(data.json()) + return json.loads(info.model_dump_json()) diff --git a/modules/gacha_log/models.py b/modules/gacha_log/models.py index 3588d131..3e4d29e2 100644 --- a/modules/gacha_log/models.py +++ b/modules/gacha_log/models.py @@ -1,6 +1,6 @@ import datetime from enum import Enum -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Union, Optional from pydantic import field_validator @@ -160,34 +160,32 @@ class UIGFItem(BaseModel): rank_type: str time: str uigf_gacha_type: UIGFGachaType + gacha_id: Optional[str] = "" class UIGFInfo(BaseModel): - uid: str = "0" - lang: str = "zh-cn" export_time: str = "" export_timestamp: int = 0 export_app: str = "" export_app_version: str = "" - uigf_version: str = UIGF_VERSION - region_time_zone: int = 8 + version: str = UIGF_VERSION def __init__(self, **data: Any): - data["region_time_zone"] = data.get("region_time_zone", UIGFInfo.get_region_time_zone(data.get("uid", "0"))) super().__init__(**data) if not self.export_time: self.export_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.export_timestamp = int(datetime.datetime.now().timestamp()) - @staticmethod - def get_region_time_zone(uid: str) -> int: - if uid.startswith("6"): - return -5 - if uid.startswith("7"): - return 1 - return 8 + +class UIGFListInfo(BaseModel): + uid: int = 0 + timezone: int = 8 + lang: str = "zh-cn" + list: List[UIGFItem] class UIGFModel(BaseModel): info: UIGFInfo - list: List[UIGFItem] + hk4e: List[UIGFListInfo] + hkrpg: List[UIGFListInfo] + nap: List[UIGFListInfo] diff --git a/modules/gacha_log/uigf.py b/modules/gacha_log/uigf.py new file mode 100644 index 00000000..8e57d602 --- /dev/null +++ b/modules/gacha_log/uigf.py @@ -0,0 +1,63 @@ +import ujson +from abc import abstractmethod +from pathlib import Path +from typing import Optional, Tuple, TYPE_CHECKING + +from metadata.shortname import roleToId, weaponToId +from modules.gacha_log.error import GachaLogNotFound +from modules.gacha_log.models import UIGFModel, UIGFInfo, ImportType, UIGFListInfo, UIGFItem + +if TYPE_CHECKING: + from modules.gacha_log.models import GachaLogInfo + + +class GachaLogUigfConverter: + """抽卡记录导出为 uigf 标准""" + + gacha_log_path: Path + + @staticmethod + @abstractmethod + async def save_json(path, data): + """保存json文件""" + + @abstractmethod + async def load_history_info( + self, user_id: str, uid: str, only_status: bool = False + ) -> Tuple[Optional["GachaLogInfo"], bool]: + """读取历史抽卡记录数据 + :param user_id: 用户id + :param uid: 原神uid + :param only_status: 是否只读取状态 + :return: 抽卡记录数据 + """ + + async def gacha_log_to_uigf(self, user_id: str, uid: str) -> Optional[Path]: + """抽卡记录转换为 UIGF 格式 + :param user_id: 用户ID + :param uid: 游戏UID + :return: 转换是否成功、转换信息、UIGF文件目录 + """ + data, state = await self.load_history_info(user_id, uid) + if not state: + raise GachaLogNotFound + save_path = self.gacha_log_path / f"{user_id}-{uid}-uigf.json" + i = UIGFInfo(export_app=ImportType.PaiGram.value, export_app_version="v4") + list_info = UIGFListInfo(uid=int(uid), list=[]) + info = UIGFModel(info=i, hk4e=[list_info], hkrpg=[], nap=[]) + for items in data.item_list.values(): + for item in items: + list_info.list.append( + UIGFItem( + id=item.id, + name=item.name, + gacha_type=item.gacha_type, + item_id=roleToId(item.name) if item.item_type == "角色" else weaponToId(item.name), + item_type=item.item_type, + rank_type=item.rank_type, + time=item.time.strftime("%Y-%m-%d %H:%M:%S"), + uigf_gacha_type=item.gacha_type if item.gacha_type != "400" else "301", + ) + ) + await self.save_json(save_path, ujson.loads(info.model_dump_json())) + return save_path diff --git a/plugins/genshin/wish_log.py b/plugins/genshin/wish_log.py index 6f0da5b2..64755d54 100644 --- a/plugins/genshin/wish_log.py +++ b/plugins/genshin/wish_log.py @@ -62,7 +62,7 @@ if TYPE_CHECKING: from gram_core.services.template.models import RenderResult INPUT_URL, INPUT_LAZY, CONFIRM_DELETE = range(10100, 10103) -WAITING = f"小{config.notice.bot_name}正在从服务器获取数据,请稍后" +WAITING = f"小{config.notice.bot_name}正在从服务器获取数据,请稍候" WISHLOG_NOT_FOUND = f"{config.notice.bot_name}没有找到你的抽卡记录,快来私聊{config.notice.bot_name}导入吧~" WISHLOG_WEB = """抽卡记录详细信息查询 @@ -174,11 +174,12 @@ class WishLogPlugin(Plugin.Conversation): file_type = "json" else: await message.reply_text( - "文件格式错误,请发送符合 UIGF 标准的抽卡记录文件或者 paimon.moe、非小酋导出的 xlsx 格式的抽卡记录文件" + "文件格式错误,请发送符合 UIGF 标准的抽卡记录文件或者 paimon.moe、非小酋导出的 xlsx 格式的抽卡记录文件", + reply_markup=ReplyKeyboardRemove(), ) return if document.file_size > 5 * 1024 * 1024: - await message.reply_text("文件过大,请发送小于 5 MB 的文件") + await message.reply_text("文件过大,请发送小于 5 MB 的文件", reply_markup=ReplyKeyboardRemove()) return try: out = BytesIO() @@ -189,32 +190,41 @@ class WishLogPlugin(Plugin.Conversation): elif file_type == "xlsx": data = self.gacha_log.convert_xlsx_to_uigf(out, self.zh_dict) else: - await message.reply_text("文件解析失败,请检查文件") + await message.reply_text("文件解析失败,请检查文件", reply_markup=ReplyKeyboardRemove()) return except PaimonMoeGachaLogFileError as exc: await message.reply_text( - f"导入失败,PaimonMoe的抽卡记录当前版本不支持\n支持抽卡记录的版本为 {exc.support_version},你的抽卡记录版本为 {exc.file_version}" + f"导入失败,PaimonMoe的抽卡记录当前版本不支持\n支持抽卡记录的版本为 {exc.support_version},你的抽卡记录版本为 {exc.file_version}", + reply_markup=ReplyKeyboardRemove(), ) return except GachaLogFileError: - await message.reply_text(f"文件解析失败,请检查文件是否符合 UIGF {UIGF_VERSION} 标准") + await message.reply_text( + f"文件解析失败,请检查文件是否符合 UIGF {UIGF_VERSION} 标准", reply_markup=ReplyKeyboardRemove() + ) return except (KeyError, IndexError, ValueError): - await message.reply_text(f"文件解析失败,请检查文件编码是否正确或符合 UIGF {UIGF_VERSION} 标准") + await message.reply_text( + f"文件解析失败,请检查文件编码是否正确或符合 UIGF {UIGF_VERSION} 标准", + reply_markup=ReplyKeyboardRemove(), + ) return except Exception as exc: logger.error("文件解析失败 %s", repr(exc)) - await message.reply_text(f"文件解析失败,请检查文件是否符合 UIGF {UIGF_VERSION} 标准") + await message.reply_text( + f"文件解析失败,请检查文件是否符合 UIGF {UIGF_VERSION} 标准", reply_markup=ReplyKeyboardRemove() + ) return await message.reply_chat_action(ChatAction.TYPING) - reply = await message.reply_text("文件解析成功,正在导入数据") + reply = await message.reply_text("文件解析成功,正在导入数据", reply_markup=ReplyKeyboardRemove()) await message.reply_chat_action(ChatAction.TYPING) try: text = await self._refresh_user_data(user, player_id, data=data, verify_uid=file_type == "json") except Exception as exc: # pylint: disable=W0703 logger.error("文件解析失败 %s", repr(exc)) text = f"文件解析失败,请检查文件是否符合 UIGF {UIGF_VERSION} 标准" - await reply.edit_text(text) + self.add_delete_message_job(reply, delay=1) + await message.reply_text(text, reply_markup=ReplyKeyboardRemove()) async def can_gen_authkey(self, user_id: int, player_id: int) -> bool: player_info = await self.players_service.get_player(user_id, region=RegionEnum.HYPERION, player_id=player_id)