Support uigf v4.0

This commit is contained in:
xtaodada 2024-12-02 00:24:40 +08:00
parent b309be9f64
commit e2d5b7ee70
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
5 changed files with 110 additions and 61 deletions

View File

@ -1,7 +1,7 @@
from simnet.models.genshin.wish import BannerType from simnet.models.genshin.wish import BannerType
PAIMONMOE_VERSION = 3 PAIMONMOE_VERSION = 3
UIGF_VERSION = "v3.0" UIGF_VERSION = "v4.0"
GACHA_TYPE_LIST = { GACHA_TYPE_LIST = {

View File

@ -41,9 +41,11 @@ from modules.gacha_log.models import (
UIGFInfo, UIGFInfo,
UIGFItem, UIGFItem,
UIGFModel, UIGFModel,
UIGFListInfo,
) )
from modules.gacha_log.online_view import GachaLogOnlineView from modules.gacha_log.online_view import GachaLogOnlineView
from modules.gacha_log.ranks import GachaLogRanks from modules.gacha_log.ranks import GachaLogRanks
from modules.gacha_log.uigf import GachaLogUigfConverter
from utils.const import PROJECT_ROOT from utils.const import PROJECT_ROOT
from utils.uid import mask_number from utils.uid import mask_number
@ -55,7 +57,7 @@ GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "gacha_log")
GACHA_LOG_PATH.mkdir(parents=True, exist_ok=True) GACHA_LOG_PATH.mkdir(parents=True, exist_ok=True)
class GachaLog(GachaLogOnlineView, GachaLogRanks): class GachaLog(GachaLogOnlineView, GachaLogRanks, GachaLogUigfConverter):
def __init__( def __init__(
self, self,
gacha_log_path: Path = GACHA_LOG_PATH, gacha_log_path: Path = GACHA_LOG_PATH,
@ -151,34 +153,6 @@ class GachaLog(GachaLogOnlineView, GachaLogRanks):
# 写入数据 # 写入数据
await self.save_json(save_path, info.json()) await self.save_json(save_path, info.json())
async def gacha_log_to_uigf(self, user_id: str, uid: str) -> Optional[Path]:
"""抽卡日记转换为 UIGF 格式
:param user_id: 用户ID
:param uid: 游戏UID
:return: 转换是否成功转换信息UIGF文件目录
"""
data, state = await self.load_history_info(user_id, uid)
if not state:
raise GachaLogNotFound
save_path = self.gacha_log_path / f"{user_id}-{uid}-uigf.json"
info = UIGFModel(info=UIGFInfo(uid=uid, export_app=ImportType.PaiGram.value, export_app_version="v3"), list=[])
for items in data.item_list.values():
for item in items:
info.list.append(
UIGFItem(
id=item.id,
name=item.name,
gacha_type=item.gacha_type,
item_id=roleToId(item.name) if item.item_type == "角色" else weaponToId(item.name),
item_type=item.item_type,
rank_type=item.rank_type,
time=item.time.strftime("%Y-%m-%d %H:%M:%S"),
uigf_gacha_type=item.gacha_type if item.gacha_type != "400" else "301",
)
)
await self.save_json(save_path, json.loads(info.json()))
return save_path
@staticmethod @staticmethod
async def verify_data(data: List[GachaItem]) -> bool: async def verify_data(data: List[GachaItem]) -> bool:
try: try:
@ -216,17 +190,20 @@ class GachaLog(GachaLogOnlineView, GachaLogRanks):
async def import_gacha_log_data(self, user_id: int, player_id: int, data: dict, verify_uid: bool = True) -> int: async def import_gacha_log_data(self, user_id: int, player_id: int, data: dict, verify_uid: bool = True) -> int:
new_num = 0 new_num = 0
try: try:
uid = data["info"]["uid"] _data, uid = None, None
if not verify_uid: for _i in data.get("hk4e", []):
uid = player_id uid = _i.get("uid", "0")
elif int(uid) != player_id: if (not verify_uid) or int(uid) == player_id:
_data = _i
break
if not _data or not uid:
raise GachaLogAccountNotFound raise GachaLogAccountNotFound
try: try:
import_type = ImportType(data["info"]["export_app"]) import_type = ImportType(data["info"]["export_app"])
except ValueError: except ValueError:
import_type = ImportType.UNKNOWN import_type = ImportType.UNKNOWN
# 检查导入数据是否合法 # 检查导入数据是否合法
all_items = [GachaItem(**i) for i in data["list"]] all_items = [GachaItem(**i) for i in _data["list"]]
await self.verify_data(all_items) await self.verify_data(all_items)
gacha_log, status = await self.load_history_info(str(user_id), uid) gacha_log, status = await self.load_history_info(str(user_id), uid)
if import_type == ImportType.PAIMONMOE: if import_type == ImportType.PAIMONMOE:
@ -808,7 +785,8 @@ class GachaLog(GachaLogOnlineView, GachaLogRanks):
UIGFGachaType.CHARACTER: "角色活动祈愿", UIGFGachaType.CHARACTER: "角色活动祈愿",
UIGFGachaType.WEAPON: "武器活动祈愿", UIGFGachaType.WEAPON: "武器活动祈愿",
} }
data = UIGFModel(info=UIGFInfo(export_app=import_type.value), list=[]) data = UIGFListInfo(list=[])
info = UIGFModel(info=UIGFInfo(export_app=import_type.value), hk4e=[data], hkrpg=[], nap=[])
if import_type == ImportType.PAIMONMOE: if import_type == ImportType.PAIMONMOE:
ws = wb["Information"] ws = wb["Information"]
if ws["B2"].value != PAIMONMOE_VERSION: if ws["B2"].value != PAIMONMOE_VERSION:
@ -852,4 +830,4 @@ class GachaLog(GachaLogOnlineView, GachaLogRanks):
break break
data.list.append(from_fxq(gacha_type, row[2], row[1], row[0], row[3], row[6])) data.list.append(from_fxq(gacha_type, row[2], row[1], row[0], row[3], row[6]))
return json.loads(data.json()) return json.loads(info.model_dump_json())

View File

@ -1,6 +1,6 @@
import datetime import datetime
from enum import Enum from enum import Enum
from typing import Any, Dict, List, Union from typing import Any, Dict, List, Union, Optional
from pydantic import field_validator from pydantic import field_validator
@ -160,34 +160,32 @@ class UIGFItem(BaseModel):
rank_type: str rank_type: str
time: str time: str
uigf_gacha_type: UIGFGachaType uigf_gacha_type: UIGFGachaType
gacha_id: Optional[str] = ""
class UIGFInfo(BaseModel): class UIGFInfo(BaseModel):
uid: str = "0"
lang: str = "zh-cn"
export_time: str = "" export_time: str = ""
export_timestamp: int = 0 export_timestamp: int = 0
export_app: str = "" export_app: str = ""
export_app_version: str = "" export_app_version: str = ""
uigf_version: str = UIGF_VERSION version: str = UIGF_VERSION
region_time_zone: int = 8
def __init__(self, **data: Any): def __init__(self, **data: Any):
data["region_time_zone"] = data.get("region_time_zone", UIGFInfo.get_region_time_zone(data.get("uid", "0")))
super().__init__(**data) super().__init__(**data)
if not self.export_time: if not self.export_time:
self.export_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.export_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.export_timestamp = int(datetime.datetime.now().timestamp()) self.export_timestamp = int(datetime.datetime.now().timestamp())
@staticmethod
def get_region_time_zone(uid: str) -> int: class UIGFListInfo(BaseModel):
if uid.startswith("6"): uid: int = 0
return -5 timezone: int = 8
if uid.startswith("7"): lang: str = "zh-cn"
return 1 list: List[UIGFItem]
return 8
class UIGFModel(BaseModel): class UIGFModel(BaseModel):
info: UIGFInfo info: UIGFInfo
list: List[UIGFItem] hk4e: List[UIGFListInfo]
hkrpg: List[UIGFListInfo]
nap: List[UIGFListInfo]

63
modules/gacha_log/uigf.py Normal file
View File

@ -0,0 +1,63 @@
import ujson
from abc import abstractmethod
from pathlib import Path
from typing import Optional, Tuple, TYPE_CHECKING
from metadata.shortname import roleToId, weaponToId
from modules.gacha_log.error import GachaLogNotFound
from modules.gacha_log.models import UIGFModel, UIGFInfo, ImportType, UIGFListInfo, UIGFItem
if TYPE_CHECKING:
from modules.gacha_log.models import GachaLogInfo
class GachaLogUigfConverter:
"""抽卡记录导出为 uigf 标准"""
gacha_log_path: Path
@staticmethod
@abstractmethod
async def save_json(path, data):
"""保存json文件"""
@abstractmethod
async def load_history_info(
self, user_id: str, uid: str, only_status: bool = False
) -> Tuple[Optional["GachaLogInfo"], bool]:
"""读取历史抽卡记录数据
:param user_id: 用户id
:param uid: 原神uid
:param only_status: 是否只读取状态
:return: 抽卡记录数据
"""
async def gacha_log_to_uigf(self, user_id: str, uid: str) -> Optional[Path]:
"""抽卡记录转换为 UIGF 格式
:param user_id: 用户ID
:param uid: 游戏UID
:return: 转换是否成功转换信息UIGF文件目录
"""
data, state = await self.load_history_info(user_id, uid)
if not state:
raise GachaLogNotFound
save_path = self.gacha_log_path / f"{user_id}-{uid}-uigf.json"
i = UIGFInfo(export_app=ImportType.PaiGram.value, export_app_version="v4")
list_info = UIGFListInfo(uid=int(uid), list=[])
info = UIGFModel(info=i, hk4e=[list_info], hkrpg=[], nap=[])
for items in data.item_list.values():
for item in items:
list_info.list.append(
UIGFItem(
id=item.id,
name=item.name,
gacha_type=item.gacha_type,
item_id=roleToId(item.name) if item.item_type == "角色" else weaponToId(item.name),
item_type=item.item_type,
rank_type=item.rank_type,
time=item.time.strftime("%Y-%m-%d %H:%M:%S"),
uigf_gacha_type=item.gacha_type if item.gacha_type != "400" else "301",
)
)
await self.save_json(save_path, ujson.loads(info.model_dump_json()))
return save_path

View File

@ -62,7 +62,7 @@ if TYPE_CHECKING:
from gram_core.services.template.models import RenderResult from gram_core.services.template.models import RenderResult
INPUT_URL, INPUT_LAZY, CONFIRM_DELETE = range(10100, 10103) INPUT_URL, INPUT_LAZY, CONFIRM_DELETE = range(10100, 10103)
WAITING = f"{config.notice.bot_name}正在从服务器获取数据,请稍" WAITING = f"{config.notice.bot_name}正在从服务器获取数据,请稍"
WISHLOG_NOT_FOUND = f"{config.notice.bot_name}没有找到你的抽卡记录,快来私聊{config.notice.bot_name}导入吧~" WISHLOG_NOT_FOUND = f"{config.notice.bot_name}没有找到你的抽卡记录,快来私聊{config.notice.bot_name}导入吧~"
WISHLOG_WEB = """<b>抽卡记录详细信息查询</b> WISHLOG_WEB = """<b>抽卡记录详细信息查询</b>
@ -174,11 +174,12 @@ class WishLogPlugin(Plugin.Conversation):
file_type = "json" file_type = "json"
else: else:
await message.reply_text( await message.reply_text(
"文件格式错误,请发送符合 UIGF 标准的抽卡记录文件或者 paimon.moe、非小酋导出的 xlsx 格式的抽卡记录文件" "文件格式错误,请发送符合 UIGF 标准的抽卡记录文件或者 paimon.moe、非小酋导出的 xlsx 格式的抽卡记录文件",
reply_markup=ReplyKeyboardRemove(),
) )
return return
if document.file_size > 5 * 1024 * 1024: if document.file_size > 5 * 1024 * 1024:
await message.reply_text("文件过大,请发送小于 5 MB 的文件") await message.reply_text("文件过大,请发送小于 5 MB 的文件", reply_markup=ReplyKeyboardRemove())
return return
try: try:
out = BytesIO() out = BytesIO()
@ -189,32 +190,41 @@ class WishLogPlugin(Plugin.Conversation):
elif file_type == "xlsx": elif file_type == "xlsx":
data = self.gacha_log.convert_xlsx_to_uigf(out, self.zh_dict) data = self.gacha_log.convert_xlsx_to_uigf(out, self.zh_dict)
else: else:
await message.reply_text("文件解析失败,请检查文件") await message.reply_text("文件解析失败,请检查文件", reply_markup=ReplyKeyboardRemove())
return return
except PaimonMoeGachaLogFileError as exc: except PaimonMoeGachaLogFileError as exc:
await message.reply_text( await message.reply_text(
f"导入失败PaimonMoe的抽卡记录当前版本不支持\n支持抽卡记录的版本为 {exc.support_version},你的抽卡记录版本为 {exc.file_version}" f"导入失败PaimonMoe的抽卡记录当前版本不支持\n支持抽卡记录的版本为 {exc.support_version},你的抽卡记录版本为 {exc.file_version}",
reply_markup=ReplyKeyboardRemove(),
) )
return return
except GachaLogFileError: except GachaLogFileError:
await message.reply_text(f"文件解析失败,请检查文件是否符合 UIGF {UIGF_VERSION} 标准") await message.reply_text(
f"文件解析失败,请检查文件是否符合 UIGF {UIGF_VERSION} 标准", reply_markup=ReplyKeyboardRemove()
)
return return
except (KeyError, IndexError, ValueError): except (KeyError, IndexError, ValueError):
await message.reply_text(f"文件解析失败,请检查文件编码是否正确或符合 UIGF {UIGF_VERSION} 标准") await message.reply_text(
f"文件解析失败,请检查文件编码是否正确或符合 UIGF {UIGF_VERSION} 标准",
reply_markup=ReplyKeyboardRemove(),
)
return return
except Exception as exc: except Exception as exc:
logger.error("文件解析失败 %s", repr(exc)) logger.error("文件解析失败 %s", repr(exc))
await message.reply_text(f"文件解析失败,请检查文件是否符合 UIGF {UIGF_VERSION} 标准") await message.reply_text(
f"文件解析失败,请检查文件是否符合 UIGF {UIGF_VERSION} 标准", reply_markup=ReplyKeyboardRemove()
)
return return
await message.reply_chat_action(ChatAction.TYPING) await message.reply_chat_action(ChatAction.TYPING)
reply = await message.reply_text("文件解析成功,正在导入数据") reply = await message.reply_text("文件解析成功,正在导入数据", reply_markup=ReplyKeyboardRemove())
await message.reply_chat_action(ChatAction.TYPING) await message.reply_chat_action(ChatAction.TYPING)
try: try:
text = await self._refresh_user_data(user, player_id, data=data, verify_uid=file_type == "json") text = await self._refresh_user_data(user, player_id, data=data, verify_uid=file_type == "json")
except Exception as exc: # pylint: disable=W0703 except Exception as exc: # pylint: disable=W0703
logger.error("文件解析失败 %s", repr(exc)) logger.error("文件解析失败 %s", repr(exc))
text = f"文件解析失败,请检查文件是否符合 UIGF {UIGF_VERSION} 标准" text = f"文件解析失败,请检查文件是否符合 UIGF {UIGF_VERSION} 标准"
await reply.edit_text(text) self.add_delete_message_job(reply, delay=1)
await message.reply_text(text, reply_markup=ReplyKeyboardRemove())
async def can_gen_authkey(self, user_id: int, player_id: int) -> bool: async def can_gen_authkey(self, user_id: int, player_id: int) -> bool:
player_info = await self.players_service.get_player(user_id, region=RegionEnum.HYPERION, player_id=player_id) player_info = await self.players_service.get_player(user_id, region=RegionEnum.HYPERION, player_id=player_id)