mirror of
https://github.com/PaiGramTeam/PamGram.git
synced 2024-11-16 03:55:26 +00:00
♻️ 重构 gacha_log
Co-authored-by: xtaodada <xtao@xtaolink.cn>
This commit is contained in:
parent
24c63002cd
commit
ab1c490a13
14
metadata/scripts/paimon_moe.py
Normal file
14
metadata/scripts/paimon_moe.py
Normal file
@ -0,0 +1,14 @@
|
||||
from utils.const import PROJECT_ROOT
|
||||
from aiofiles import open as async_open
|
||||
from httpx import AsyncClient, URL
|
||||
|
||||
|
||||
async def update_paimon_moe_zh(overwrite: bool = True):
|
||||
path = PROJECT_ROOT.joinpath("metadata/data/paimon_moe_zh.json")
|
||||
if not overwrite and path.exists():
|
||||
return
|
||||
host = URL("https://raw.fastgit.org/MadeBaruna/paimon-moe/main/src/locales/items/zh.json")
|
||||
client = AsyncClient()
|
||||
text = (await client.get(host)).text
|
||||
async with async_open(path, mode="w", encoding="utf-8") as file:
|
||||
await file.write(text)
|
@ -4,7 +4,7 @@ import functools
|
||||
|
||||
from metadata.genshin import WEAPON_DATA
|
||||
|
||||
__all__ = ["roles", "weapons", "roleToId", "roleToName", "weaponToName", "weaponToId"]
|
||||
__all__ = ["roles", "weapons", "roleToId", "roleToName", "weaponToName", "weaponToId", "not_real_roles"]
|
||||
|
||||
# noinspection SpellCheckingInspection
|
||||
roles = {
|
||||
|
@ -5,14 +5,6 @@ class APIHelperException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class GachaLogException(APIHelperException):
|
||||
pass
|
||||
|
||||
|
||||
class GachaLogAccountNotFound(GachaLogException):
|
||||
pass
|
||||
|
||||
|
||||
class NetworkException(APIHelperException):
|
||||
pass
|
||||
|
||||
|
13
modules/gacha_log/const.py
Normal file
13
modules/gacha_log/const.py
Normal file
@ -0,0 +1,13 @@
|
||||
from genshin.models import BannerType
|
||||
|
||||
PAIMONMOE_VERSION = 3
|
||||
UIGF_VERSION = "v2.2"
|
||||
|
||||
|
||||
GACHA_TYPE_LIST = {
|
||||
BannerType.NOVICE: "新手祈愿",
|
||||
BannerType.PERMANENT: "常驻祈愿",
|
||||
BannerType.WEAPON: "武器祈愿",
|
||||
BannerType.CHARACTER1: "角色祈愿",
|
||||
BannerType.CHARACTER2: "角色祈愿",
|
||||
}
|
29
modules/gacha_log/error.py
Normal file
29
modules/gacha_log/error.py
Normal file
@ -0,0 +1,29 @@
|
||||
class GachaLogException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class GachaLogFileError(GachaLogException):
|
||||
pass
|
||||
|
||||
|
||||
class GachaLogNotFound(GachaLogException):
|
||||
pass
|
||||
|
||||
|
||||
class GachaLogAccountNotFound(GachaLogException):
|
||||
pass
|
||||
|
||||
|
||||
class GachaLogInvalidAuthkey(GachaLogException):
|
||||
pass
|
||||
|
||||
|
||||
class GachaLogMixedProvider(GachaLogException):
|
||||
pass
|
||||
|
||||
|
||||
class PaimonMoeGachaLogFileError(GachaLogFileError):
|
||||
def __init__(self, file_version: int, support_version: int):
|
||||
super().__init__("Paimon.Moe version not supported")
|
||||
self.support_version = support_version
|
||||
self.file_version = file_version
|
9
modules/gacha_log/helpers.py
Normal file
9
modules/gacha_log/helpers.py
Normal file
@ -0,0 +1,9 @@
|
||||
def from_url_get_authkey(url: str) -> str:
|
||||
"""从 UEL 解析 authkey
|
||||
:param url: URL
|
||||
:return: authkey
|
||||
"""
|
||||
try:
|
||||
return url.split("authkey=")[1].split("&")[0]
|
||||
except IndexError:
|
||||
return url
|
@ -1,138 +1,47 @@
|
||||
import contextlib
|
||||
import datetime
|
||||
import json
|
||||
import time
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Tuple, Optional, Union
|
||||
from typing import List, Tuple, Optional
|
||||
|
||||
import aiofiles
|
||||
from genshin import Client, InvalidAuthkey
|
||||
from genshin.models import BannerType
|
||||
from pydantic import BaseModel, validator
|
||||
from openpyxl import load_workbook
|
||||
|
||||
from core.base.assets import AssetsService
|
||||
from metadata.pool.pool import get_pool_by_id
|
||||
from metadata.shortname import roleToId, weaponToId, not_real_roles
|
||||
from modules.apihelper.error import GachaLogAccountNotFound
|
||||
from utils.const import PROJECT_ROOT
|
||||
from utils.log import logger
|
||||
|
||||
GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "gacha_log")
|
||||
GACHA_LOG_PATH.mkdir(parents=True, exist_ok=True)
|
||||
GACHA_TYPE_LIST = {
|
||||
BannerType.NOVICE: "新手祈愿",
|
||||
BannerType.PERMANENT: "常驻祈愿",
|
||||
BannerType.WEAPON: "武器祈愿",
|
||||
BannerType.CHARACTER1: "角色祈愿",
|
||||
BannerType.CHARACTER2: "角色祈愿",
|
||||
}
|
||||
|
||||
|
||||
class FiveStarItem(BaseModel):
|
||||
name: str
|
||||
icon: str
|
||||
count: int
|
||||
type: str
|
||||
isUp: bool
|
||||
isBig: bool
|
||||
time: datetime.datetime
|
||||
|
||||
|
||||
class FourStarItem(BaseModel):
|
||||
name: str
|
||||
icon: str
|
||||
count: int
|
||||
type: str
|
||||
time: datetime.datetime
|
||||
|
||||
|
||||
class GachaItem(BaseModel):
|
||||
id: str
|
||||
name: str
|
||||
gacha_type: str
|
||||
item_type: str
|
||||
rank_type: str
|
||||
time: datetime.datetime
|
||||
|
||||
@validator("name")
|
||||
def name_validator(cls, v):
|
||||
if item_id := (roleToId(v) or weaponToId(v)):
|
||||
if item_id not in not_real_roles:
|
||||
return v
|
||||
raise ValueError("Invalid name")
|
||||
|
||||
@validator("gacha_type")
|
||||
def check_gacha_type(cls, v):
|
||||
if v not in {"100", "200", "301", "302", "400"}:
|
||||
raise ValueError("gacha_type must be 200, 301, 302 or 400")
|
||||
return v
|
||||
|
||||
@validator("item_type")
|
||||
def check_item_type(cls, item):
|
||||
if item not in {"角色", "武器"}:
|
||||
raise ValueError("error item type")
|
||||
return item
|
||||
|
||||
@validator("rank_type")
|
||||
def check_rank_type(cls, rank):
|
||||
if rank not in {"5", "4", "3"}:
|
||||
raise ValueError("error rank type")
|
||||
return rank
|
||||
|
||||
|
||||
class GachaLogInfo(BaseModel):
|
||||
user_id: str
|
||||
uid: str
|
||||
update_time: datetime.datetime
|
||||
item_list: Dict[str, List[GachaItem]] = {
|
||||
"角色祈愿": [],
|
||||
"武器祈愿": [],
|
||||
"常驻祈愿": [],
|
||||
"新手祈愿": [],
|
||||
}
|
||||
|
||||
|
||||
class Pool:
|
||||
def __init__(self, five: List[str], four: List[str], name: str, to: str, **kwargs):
|
||||
self.five = five
|
||||
self.real_name = name
|
||||
self.name = "、".join(self.five)
|
||||
self.four = four
|
||||
self.from_ = kwargs.get("from")
|
||||
self.to = to
|
||||
self.from_time = datetime.datetime.strptime(self.from_, "%Y-%m-%d %H:%M:%S")
|
||||
self.to_time = datetime.datetime.strptime(self.to, "%Y-%m-%d %H:%M:%S")
|
||||
self.start = self.from_time
|
||||
self.start_init = False
|
||||
self.end = self.to_time
|
||||
self.dict = {}
|
||||
self.count = 0
|
||||
|
||||
def parse(self, item: Union[FiveStarItem, FourStarItem]):
|
||||
if self.from_time <= item.time <= self.to_time:
|
||||
if self.dict.get(item.name):
|
||||
self.dict[item.name]["count"] += 1
|
||||
else:
|
||||
self.dict[item.name] = {
|
||||
"name": item.name,
|
||||
"icon": item.icon,
|
||||
"count": 1,
|
||||
"rank_type": 5 if isinstance(item, FiveStarItem) else 4,
|
||||
}
|
||||
|
||||
def count_item(self, item: List[GachaItem]):
|
||||
for i in item:
|
||||
if self.from_time <= i.time <= self.to_time:
|
||||
self.count += 1
|
||||
if not self.start_init:
|
||||
self.start = i.time
|
||||
self.end = i.time
|
||||
|
||||
def to_list(self):
|
||||
return list(self.dict.values())
|
||||
from metadata.shortname import roleToId, weaponToId
|
||||
from modules.gacha_log.const import GACHA_TYPE_LIST, PAIMONMOE_VERSION
|
||||
from modules.gacha_log.error import (
|
||||
GachaLogAccountNotFound,
|
||||
GachaLogInvalidAuthkey,
|
||||
GachaLogException,
|
||||
GachaLogFileError,
|
||||
GachaLogNotFound,
|
||||
PaimonMoeGachaLogFileError,
|
||||
GachaLogMixedProvider,
|
||||
)
|
||||
from modules.gacha_log.models import (
|
||||
GachaItem,
|
||||
FiveStarItem,
|
||||
FourStarItem,
|
||||
Pool,
|
||||
GachaLogInfo,
|
||||
UIGFGachaType,
|
||||
ItemType,
|
||||
ImportType,
|
||||
UIGFModel,
|
||||
UIGFInfo,
|
||||
UIGFItem,
|
||||
)
|
||||
|
||||
|
||||
class GachaLog:
|
||||
def __init__(self, gacha_log_path: Path):
|
||||
self.gacha_log_path = gacha_log_path
|
||||
|
||||
@staticmethod
|
||||
async def load_json(path):
|
||||
async with aiofiles.open(path, "r", encoding="utf-8") as f:
|
||||
@ -145,9 +54,8 @@ class GachaLog:
|
||||
return await f.write(json.dumps(data, ensure_ascii=False, indent=4))
|
||||
await f.write(data)
|
||||
|
||||
@staticmethod
|
||||
async def load_history_info(
|
||||
user_id: str, uid: str, only_status: bool = False
|
||||
self, user_id: str, uid: str, only_status: bool = False
|
||||
) -> Tuple[Optional[GachaLogInfo], bool]:
|
||||
"""读取历史抽卡记录数据
|
||||
:param user_id: 用户id
|
||||
@ -155,26 +63,25 @@ class GachaLog:
|
||||
:param only_status: 是否只读取状态
|
||||
:return: 抽卡记录数据
|
||||
"""
|
||||
file_path = GACHA_LOG_PATH / f"{user_id}-{uid}.json"
|
||||
file_path = self.gacha_log_path / f"{user_id}-{uid}.json"
|
||||
if only_status:
|
||||
return None, file_path.exists()
|
||||
if not file_path.exists():
|
||||
return GachaLogInfo(user_id=user_id, uid=uid, update_time=datetime.datetime.now()), False
|
||||
try:
|
||||
return GachaLogInfo.parse_obj(await GachaLog.load_json(file_path)), True
|
||||
return GachaLogInfo.parse_obj(await self.load_json(file_path)), True
|
||||
except json.decoder.JSONDecodeError:
|
||||
return GachaLogInfo(user_id=user_id, uid=uid, update_time=datetime.datetime.now()), False
|
||||
|
||||
@staticmethod
|
||||
async def remove_history_info(user_id: str, uid: str) -> bool:
|
||||
async def remove_history_info(self, user_id: str, uid: str) -> bool:
|
||||
"""删除历史抽卡记录数据
|
||||
:param user_id: 用户id
|
||||
:param uid: 原神uid
|
||||
:return: 是否删除成功
|
||||
"""
|
||||
file_path = GACHA_LOG_PATH / f"{user_id}-{uid}.json"
|
||||
file_bak_path = GACHA_LOG_PATH / f"{user_id}-{uid}.json.bak"
|
||||
file_export_path = GACHA_LOG_PATH / f"{user_id}-{uid}-uigf.json"
|
||||
file_path = self.gacha_log_path / f"{user_id}-{uid}.json"
|
||||
file_bak_path = self.gacha_log_path / f"{user_id}-{uid}.json.bak"
|
||||
file_export_path = self.gacha_log_path / f"{user_id}-{uid}-uigf.json"
|
||||
with contextlib.suppress(Exception):
|
||||
file_bak_path.unlink(missing_ok=True)
|
||||
with contextlib.suppress(Exception):
|
||||
@ -187,15 +94,14 @@ class GachaLog:
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
async def save_gacha_log_info(user_id: str, uid: str, info: GachaLogInfo):
|
||||
async def save_gacha_log_info(self, user_id: str, uid: str, info: GachaLogInfo):
|
||||
"""保存抽卡记录数据
|
||||
:param user_id: 用户id
|
||||
:param uid: 原神uid
|
||||
:param info: 抽卡记录数据
|
||||
"""
|
||||
save_path = GACHA_LOG_PATH / f"{user_id}-{uid}.json"
|
||||
save_path_bak = GACHA_LOG_PATH / f"{user_id}-{uid}.json.bak"
|
||||
save_path = self.gacha_log_path / f"{user_id}-{uid}.json"
|
||||
save_path_bak = self.gacha_log_path / f"{user_id}-{uid}.json.bak"
|
||||
# 将旧数据备份一次
|
||||
with contextlib.suppress(PermissionError):
|
||||
if save_path.exists():
|
||||
@ -203,82 +109,73 @@ class GachaLog:
|
||||
save_path_bak.unlink()
|
||||
save_path.rename(save_path.parent / f"{save_path.name}.bak")
|
||||
# 写入数据
|
||||
await GachaLog.save_json(save_path, info.json())
|
||||
await self.save_json(save_path, info.json())
|
||||
|
||||
@staticmethod
|
||||
async def gacha_log_to_uigf(user_id: str, uid: str) -> Tuple[bool, str, Optional[Path]]:
|
||||
async def gacha_log_to_uigf(self, user_id: str, uid: str) -> Optional[Path]:
|
||||
"""抽卡日记转换为 UIGF 格式
|
||||
:param user_id: 用户ID
|
||||
:param uid: 游戏UID
|
||||
:return: 转换是否成功、转换信息、UIGF文件目录
|
||||
"""
|
||||
data, state = await GachaLog.load_history_info(user_id, uid)
|
||||
data, state = await self.load_history_info(user_id, uid)
|
||||
if not state:
|
||||
return False, "派蒙还没有找到你导入的任何抽卡记录哦,快试试导入吧~", None
|
||||
save_path = GACHA_LOG_PATH / f"{user_id}-{uid}-uigf.json"
|
||||
uigf_dict = {
|
||||
"info": {
|
||||
"uid": uid,
|
||||
"lang": "zh-cn",
|
||||
"export_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"export_timestamp": int(time.time()),
|
||||
"export_app": "TGPaimonBot",
|
||||
"export_app_version": "v3",
|
||||
"uigf_version": "v2.2",
|
||||
},
|
||||
"list": [],
|
||||
}
|
||||
raise GachaLogNotFound
|
||||
save_path = self.gacha_log_path / f"{user_id}-{uid}-uigf.json"
|
||||
info = UIGFModel(
|
||||
info=UIGFInfo(uid=uid, export_app=ImportType.TGPaimonBot.value, export_app_version="v3"), list=[]
|
||||
)
|
||||
for items in data.item_list.values():
|
||||
for item in items:
|
||||
uigf_dict["list"].append(
|
||||
{
|
||||
"gacha_type": item.gacha_type,
|
||||
"item_id": "",
|
||||
"count": "1",
|
||||
"time": item.time.strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"name": item.name,
|
||||
"item_type": item.item_type,
|
||||
"rank_type": item.rank_type,
|
||||
"id": item.id,
|
||||
"uigf_gacha_type": item.gacha_type,
|
||||
}
|
||||
info.list.append(
|
||||
UIGFItem(
|
||||
id=item.id,
|
||||
name=item.name,
|
||||
gacha_type=item.gacha_type,
|
||||
item_type=item.item_type,
|
||||
rank_type=item.rank_type,
|
||||
time=item.time.strftime("%Y-%m-%d %H:%M:%S"),
|
||||
uigf_gacha_type=item.gacha_type,
|
||||
)
|
||||
)
|
||||
await GachaLog.save_json(save_path, uigf_dict)
|
||||
return True, "", save_path
|
||||
await self.save_json(save_path, info.dict())
|
||||
return save_path
|
||||
|
||||
@staticmethod
|
||||
async def verify_data(data: List[GachaItem]):
|
||||
async def verify_data(data: List[GachaItem]) -> bool:
|
||||
try:
|
||||
total = len(data)
|
||||
five_star = len([i for i in data if i.rank_type == "5"])
|
||||
four_star = len([i for i in data if i.rank_type == "4"])
|
||||
if total > 50:
|
||||
if total <= five_star * 15:
|
||||
return False, "检测到您将要导入的抽卡记录中五星数量过多,可能是由于文件错误导致的,请检查后重新导入。"
|
||||
raise GachaLogFileError("检测到您将要导入的抽卡记录中五星数量过多,可能是由于文件错误导致的,请检查后重新导入。")
|
||||
if four_star < five_star:
|
||||
return False, "检测到您将要导入的抽卡记录中五星数量过多,可能是由于文件错误导致的,请检查后重新导入。"
|
||||
return True, ""
|
||||
raise GachaLogFileError("检测到您将要导入的抽卡记录中五星数量过多,可能是由于文件错误导致的,请检查后重新导入。")
|
||||
return True
|
||||
except Exception as exc: # pylint: disable=W0703
|
||||
logger.warning(f"抽卡记录数据验证失败 {repr(exc)}")
|
||||
return False, "导入失败,数据格式错误"
|
||||
raise GachaLogFileError from exc
|
||||
|
||||
@staticmethod
|
||||
async def import_gacha_log_data(
|
||||
user_id: int, client: Client, data: dict, verify_uid: bool = True
|
||||
) -> Tuple[bool, str]:
|
||||
async def import_gacha_log_data(self, user_id: int, client: Client, data: dict, verify_uid: bool = True) -> int:
|
||||
new_num = 0
|
||||
try:
|
||||
if not verify_uid:
|
||||
data["info"]["uid"] = str(client.uid)
|
||||
uid = data["info"]["uid"]
|
||||
if int(uid) != client.uid:
|
||||
if not verify_uid:
|
||||
uid = client.uid
|
||||
elif int(uid) != client.uid:
|
||||
raise GachaLogAccountNotFound
|
||||
try:
|
||||
import_type = ImportType(data["info"]["export_app"])
|
||||
except ValueError:
|
||||
import_type = ImportType.UNKNOWN
|
||||
# 检查导入数据是否合法
|
||||
all_items = [GachaItem(**i) for i in data["list"]]
|
||||
status, text = await GachaLog.verify_data(all_items)
|
||||
if not status:
|
||||
return text
|
||||
gacha_log, _ = await GachaLog.load_history_info(str(user_id), uid)
|
||||
await self.verify_data(all_items)
|
||||
gacha_log, status = await self.load_history_info(str(user_id), uid)
|
||||
if import_type == ImportType.PAIMONMOE:
|
||||
if status and gacha_log.get_import_type != ImportType.PAIMONMOE:
|
||||
raise GachaLogMixedProvider
|
||||
elif status and gacha_log.get_import_type == ImportType.PAIMONMOE:
|
||||
raise GachaLogMixedProvider
|
||||
# 将唯一 id 放入临时数据中,加快查找速度
|
||||
temp_id_data = {
|
||||
pool_name: [i.id for i in pool_data] for pool_name, pool_data in gacha_log.item_list.items()
|
||||
@ -291,30 +188,30 @@ class GachaLog:
|
||||
new_num += 1
|
||||
for i in gacha_log.item_list.values():
|
||||
# 检查导入后的数据是否合法
|
||||
status, text = await GachaLog.verify_data(i)
|
||||
if not status:
|
||||
return text
|
||||
await self.verify_data(i)
|
||||
i.sort(key=lambda x: (x.time, x.id))
|
||||
gacha_log.update_time = datetime.datetime.now()
|
||||
await GachaLog.save_gacha_log_info(str(user_id), uid, gacha_log)
|
||||
return "导入完成,本次没有新增数据" if new_num == 0 else f"导入完成,本次共新增{new_num}条抽卡记录"
|
||||
except GachaLogAccountNotFound:
|
||||
return "导入失败,文件包含的祈愿记录所属 uid 与你当前绑定的 uid 不同"
|
||||
gacha_log.import_type = import_type.value
|
||||
await self.save_gacha_log_info(str(user_id), uid, gacha_log)
|
||||
return new_num
|
||||
except GachaLogAccountNotFound as e:
|
||||
raise GachaLogAccountNotFound("导入失败,文件包含的祈愿记录所属 uid 与你当前绑定的 uid 不同") from e
|
||||
except GachaLogMixedProvider as e:
|
||||
raise GachaLogMixedProvider from e
|
||||
except Exception as exc:
|
||||
logger.warning(f"导入失败,数据格式错误 {repr(exc)}")
|
||||
return "导入失败,数据格式错误"
|
||||
raise GachaLogException from exc
|
||||
|
||||
@staticmethod
|
||||
async def get_gacha_log_data(user_id: int, client: Client, authkey: str) -> str:
|
||||
"""
|
||||
使用authkey获取抽卡记录数据,并合并旧数据
|
||||
async def get_gacha_log_data(self, user_id: int, client: Client, authkey: str) -> int:
|
||||
"""使用authkey获取抽卡记录数据,并合并旧数据
|
||||
:param user_id: 用户id
|
||||
:param client: genshin client
|
||||
:param authkey: authkey
|
||||
:return: 更新结果
|
||||
"""
|
||||
new_num = 0
|
||||
gacha_log, _ = await GachaLog.load_history_info(str(user_id), str(client.uid))
|
||||
gacha_log, _ = await self.load_history_info(str(user_id), str(client.uid))
|
||||
if gacha_log.get_import_type == ImportType.PAIMONMOE:
|
||||
raise GachaLogMixedProvider
|
||||
# 将唯一 id 放入临时数据中,加快查找速度
|
||||
temp_id_data = {pool_name: [i.id for i in pool_data] for pool_name, pool_data in gacha_log.item_list.items()}
|
||||
try:
|
||||
@ -340,13 +237,14 @@ class GachaLog:
|
||||
gacha_log.item_list[pool_name].append(item)
|
||||
temp_id_data[pool_name].append(item.id)
|
||||
new_num += 1
|
||||
except InvalidAuthkey:
|
||||
return "更新数据失败,authkey 无效"
|
||||
except InvalidAuthkey as exc:
|
||||
raise GachaLogInvalidAuthkey from exc
|
||||
for i in gacha_log.item_list.values():
|
||||
i.sort(key=lambda x: (x.time, x.id))
|
||||
gacha_log.update_time = datetime.datetime.now()
|
||||
await GachaLog.save_gacha_log_info(str(user_id), str(client.uid), gacha_log)
|
||||
return "更新完成,本次没有新增数据" if new_num == 0 else f"更新完成,本次共新增{new_num}条抽卡记录"
|
||||
gacha_log.import_type = ImportType.UIGF.value
|
||||
await self.save_gacha_log_info(str(user_id), str(client.uid), gacha_log)
|
||||
return new_num
|
||||
|
||||
@staticmethod
|
||||
def check_avatar_up(name: str, gacha_time: datetime.datetime) -> bool:
|
||||
@ -364,8 +262,7 @@ class GachaLog:
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
async def get_all_5_star_items(data: List[GachaItem], assets: AssetsService, pool_name: str = "角色祈愿"):
|
||||
async def get_all_5_star_items(self, data: List[GachaItem], assets: AssetsService, pool_name: str = "角色祈愿"):
|
||||
"""
|
||||
获取所有5星角色
|
||||
:param data: 抽卡记录
|
||||
@ -384,7 +281,7 @@ class GachaLog:
|
||||
"icon": (await assets.avatar(roleToId(item.name)).icon()).as_uri(),
|
||||
"count": count,
|
||||
"type": "角色",
|
||||
"isUp": GachaLog.check_avatar_up(item.name, item.time) if pool_name == "角色祈愿" else False,
|
||||
"isUp": self.check_avatar_up(item.name, item.time) if pool_name == "角色祈愿" else False,
|
||||
"isBig": (not result[-1].isUp) if result and pool_name == "角色祈愿" else False,
|
||||
"time": item.time,
|
||||
}
|
||||
@ -567,8 +464,7 @@ class GachaLog:
|
||||
return f"{pool_name} · 非"
|
||||
return pool_name
|
||||
|
||||
@staticmethod
|
||||
async def get_analysis(user_id: int, client: Client, pool: BannerType, assets: AssetsService):
|
||||
async def get_analysis(self, user_id: int, client: Client, pool: BannerType, assets: AssetsService):
|
||||
"""
|
||||
获取抽卡记录分析数据
|
||||
:param user_id: 用户id
|
||||
@ -577,26 +473,26 @@ class GachaLog:
|
||||
:param assets: 资源服务
|
||||
:return: 分析数据
|
||||
"""
|
||||
gacha_log, status = await GachaLog.load_history_info(str(user_id), str(client.uid))
|
||||
gacha_log, status = await self.load_history_info(str(user_id), str(client.uid))
|
||||
if not status:
|
||||
return "派蒙没有找到你的抽卡记录,快来私聊派蒙导入吧~"
|
||||
raise GachaLogNotFound
|
||||
pool_name = GACHA_TYPE_LIST[pool]
|
||||
data = gacha_log.item_list[pool_name]
|
||||
total = len(data)
|
||||
if total == 0:
|
||||
return "派蒙没有找到这个卡池的抽卡记录,快来私聊派蒙导入吧~"
|
||||
all_five, no_five_star = await GachaLog.get_all_5_star_items(data, assets, pool_name)
|
||||
all_four, no_four_star = await GachaLog.get_all_4_star_items(data, assets)
|
||||
raise GachaLogNotFound
|
||||
all_five, no_five_star = await self.get_all_5_star_items(data, assets, pool_name)
|
||||
all_four, no_four_star = await self.get_all_4_star_items(data, assets)
|
||||
summon_data = None
|
||||
if pool == BannerType.CHARACTER1:
|
||||
summon_data = GachaLog.get_301_pool_data(total, all_five, no_five_star, no_four_star)
|
||||
pool_name = GachaLog.count_fortune(pool_name, summon_data)
|
||||
summon_data = self.get_301_pool_data(total, all_five, no_five_star, no_four_star)
|
||||
pool_name = self.count_fortune(pool_name, summon_data)
|
||||
elif pool == BannerType.WEAPON:
|
||||
summon_data = GachaLog.get_302_pool_data(total, all_five, all_four, no_five_star, no_four_star)
|
||||
pool_name = GachaLog.count_fortune(pool_name, summon_data, True)
|
||||
summon_data = self.get_302_pool_data(total, all_five, all_four, no_five_star, no_four_star)
|
||||
pool_name = self.count_fortune(pool_name, summon_data, True)
|
||||
elif pool == BannerType.PERMANENT:
|
||||
summon_data = GachaLog.get_200_pool_data(total, all_five, all_four, no_five_star, no_four_star)
|
||||
pool_name = GachaLog.count_fortune(pool_name, summon_data)
|
||||
summon_data = self.get_200_pool_data(total, all_five, all_four, no_five_star, no_four_star)
|
||||
pool_name = self.count_fortune(pool_name, summon_data)
|
||||
last_time = data[0].time.strftime("%Y-%m-%d %H:%M")
|
||||
first_time = data[-1].time.strftime("%Y-%m-%d %H:%M")
|
||||
return {
|
||||
@ -611,10 +507,10 @@ class GachaLog:
|
||||
"fourLog": all_four[:18],
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
async def get_pool_analysis(user_id: int, client: Client, pool: BannerType, assets: AssetsService, group: bool):
|
||||
"""
|
||||
获取抽卡记录分析数据
|
||||
async def get_pool_analysis(
|
||||
self, user_id: int, client: Client, pool: BannerType, assets: AssetsService, group: bool
|
||||
) -> dict:
|
||||
"""获取抽卡记录分析数据
|
||||
:param user_id: 用户id
|
||||
:param client: genshin client
|
||||
:param pool: 池子类型
|
||||
@ -622,16 +518,16 @@ class GachaLog:
|
||||
:param group: 是否群组
|
||||
:return: 分析数据
|
||||
"""
|
||||
gacha_log, status = await GachaLog.load_history_info(str(user_id), str(client.uid))
|
||||
gacha_log, status = await self.load_history_info(str(user_id), str(client.uid))
|
||||
if not status:
|
||||
return "派蒙没有找到你的抽卡记录,快来私聊派蒙导入吧~"
|
||||
raise GachaLogNotFound
|
||||
pool_name = GACHA_TYPE_LIST[pool]
|
||||
data = gacha_log.item_list[pool_name]
|
||||
total = len(data)
|
||||
if total == 0:
|
||||
return "派蒙没有找到这个卡池的抽卡记录,快来私聊派蒙导入吧~"
|
||||
all_five, _ = await GachaLog.get_all_5_star_items(data, assets, pool_name)
|
||||
all_four, _ = await GachaLog.get_all_4_star_items(data, assets)
|
||||
raise GachaLogNotFound
|
||||
all_five, _ = await self.get_all_5_star_items(data, assets, pool_name)
|
||||
all_four, _ = await self.get_all_4_star_items(data, assets)
|
||||
pool_data = []
|
||||
up_pool_data = [Pool(**i) for i in get_pool_by_id(pool.value)]
|
||||
for up_pool in up_pool_data:
|
||||
@ -657,3 +553,128 @@ class GachaLog:
|
||||
"pool": pool_data[:6] if group else pool_data,
|
||||
"hasMore": len(pool_data) > 6,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def convert_xlsx_to_uigf(data: BytesIO, zh_dict: dict) -> dict:
|
||||
"""转换 paimone.moe 或 非小酋 导出 xlsx 数据为 UIGF 格式
|
||||
:param zh_dict:
|
||||
:param data: paimon.moe 导出的 xlsx 数据
|
||||
:return: UIGF 格式数据
|
||||
"""
|
||||
|
||||
def from_paimon_moe(
|
||||
uigf_gacha_type: UIGFGachaType, item_type: str, name: str, date_string: str, rank_type: int, _id: int
|
||||
) -> UIGFItem:
|
||||
item_type = ItemType.CHARACTER if item_type == "Character" else ItemType.WEAPON
|
||||
return UIGFItem(
|
||||
id=str(_id),
|
||||
name=zh_dict[name],
|
||||
gacha_type=uigf_gacha_type,
|
||||
item_type=item_type,
|
||||
rank_type=str(rank_type),
|
||||
time=date_string,
|
||||
uigf_gacha_type=uigf_gacha_type,
|
||||
)
|
||||
|
||||
def from_uigf(
|
||||
uigf_gacha_type: str,
|
||||
gacha__type: str,
|
||||
item_type: str,
|
||||
name: str,
|
||||
date_string: str,
|
||||
rank_type: str,
|
||||
_id: str,
|
||||
) -> UIGFItem:
|
||||
return UIGFItem(
|
||||
id=_id,
|
||||
name=name,
|
||||
gacha_type=gacha__type,
|
||||
item_type=item_type,
|
||||
rank_type=rank_type,
|
||||
time=date_string,
|
||||
uigf_gacha_type=uigf_gacha_type,
|
||||
)
|
||||
|
||||
def from_fxq(
|
||||
uigf_gacha_type: UIGFGachaType, item_type: str, name: str, date_string: str, rank_type: int, _id: int
|
||||
) -> UIGFItem:
|
||||
item_type = ItemType.CHARACTER if item_type == "角色" else ItemType.WEAPON
|
||||
return UIGFItem(
|
||||
id=str(_id),
|
||||
name=name,
|
||||
gacha_type=uigf_gacha_type,
|
||||
item_type=item_type,
|
||||
rank_type=str(rank_type),
|
||||
time=date_string,
|
||||
uigf_gacha_type=uigf_gacha_type,
|
||||
)
|
||||
|
||||
wb = load_workbook(data)
|
||||
wb_len = len(wb.worksheets)
|
||||
|
||||
if wb_len == 6:
|
||||
import_type = ImportType.PAIMONMOE
|
||||
elif wb_len == 5:
|
||||
import_type = ImportType.UIGF
|
||||
elif wb_len == 4:
|
||||
import_type = ImportType.FXQ
|
||||
else:
|
||||
raise GachaLogFileError("xlsx 格式错误")
|
||||
|
||||
paimonmoe_sheets = {
|
||||
UIGFGachaType.BEGINNER: "Beginners' Wish",
|
||||
UIGFGachaType.STANDARD: "Standard",
|
||||
UIGFGachaType.CHARACTER: "Character Event",
|
||||
UIGFGachaType.WEAPON: "Weapon Event",
|
||||
}
|
||||
fxq_sheets = {
|
||||
UIGFGachaType.BEGINNER: "新手祈愿",
|
||||
UIGFGachaType.STANDARD: "常驻祈愿",
|
||||
UIGFGachaType.CHARACTER: "角色活动祈愿",
|
||||
UIGFGachaType.WEAPON: "武器活动祈愿",
|
||||
}
|
||||
data = UIGFModel(info=UIGFInfo(export_app=import_type.value), list=[])
|
||||
if import_type == ImportType.PAIMONMOE:
|
||||
ws = wb["Information"]
|
||||
if ws["B2"].value != PAIMONMOE_VERSION:
|
||||
raise PaimonMoeGachaLogFileError(file_version=ws["B2"].value, support_version=PAIMONMOE_VERSION)
|
||||
count = 1
|
||||
for gacha_type in paimonmoe_sheets:
|
||||
ws = wb[paimonmoe_sheets[gacha_type]]
|
||||
for row in ws.iter_rows(min_row=2, values_only=True):
|
||||
if row[0] is None:
|
||||
break
|
||||
data.list.append(from_paimon_moe(gacha_type, row[0], row[1], row[2], row[3], count))
|
||||
count += 1
|
||||
elif import_type == ImportType.UIGF:
|
||||
ws = wb["原始数据"]
|
||||
type_map = {}
|
||||
count = 0
|
||||
for row in ws["1"]:
|
||||
if row.value is None:
|
||||
break
|
||||
type_map[row.value] = count
|
||||
count += 1
|
||||
for row in ws.iter_rows(min_row=2, values_only=True):
|
||||
if row[0] is None:
|
||||
break
|
||||
data.list.append(
|
||||
from_uigf(
|
||||
row[type_map["uigf_gacha_type"]],
|
||||
row[type_map["gacha_type"]],
|
||||
row[type_map["item_type"]],
|
||||
row[type_map["name"]],
|
||||
row[type_map["time"]],
|
||||
row[type_map["rank_type"]],
|
||||
row[type_map["id"]],
|
||||
)
|
||||
)
|
||||
else:
|
||||
for gacha_type in fxq_sheets:
|
||||
ws = wb[fxq_sheets[gacha_type]]
|
||||
for row in ws.iter_rows(min_row=2, values_only=True):
|
||||
if row[0] is None:
|
||||
break
|
||||
data.list.append(from_fxq(gacha_type, row[2], row[1], row[0], row[3], row[6]))
|
||||
|
||||
return json.loads(data.json())
|
174
modules/gacha_log/models.py
Normal file
174
modules/gacha_log/models.py
Normal file
@ -0,0 +1,174 @@
|
||||
import datetime
|
||||
from enum import Enum
|
||||
from typing import List, Dict, Union, Any
|
||||
|
||||
from pydantic import BaseModel, validator
|
||||
|
||||
from metadata.shortname import roleToId, weaponToId, not_real_roles
|
||||
from modules.gacha_log.const import UIGF_VERSION
|
||||
|
||||
|
||||
class ImportType(Enum):
|
||||
TGPaimonBot = "TGPaimonBot"
|
||||
PAIMONMOE = "PAIMONMOE"
|
||||
FXQ = "FXQ"
|
||||
UIGF = "UIGF"
|
||||
UNKNOWN = "UNKNOWN"
|
||||
|
||||
|
||||
class FiveStarItem(BaseModel):
|
||||
name: str
|
||||
icon: str
|
||||
count: int
|
||||
type: str
|
||||
isUp: bool
|
||||
isBig: bool
|
||||
time: datetime.datetime
|
||||
|
||||
|
||||
class FourStarItem(BaseModel):
|
||||
name: str
|
||||
icon: str
|
||||
count: int
|
||||
type: str
|
||||
time: datetime.datetime
|
||||
|
||||
|
||||
class GachaItem(BaseModel):
|
||||
id: str
|
||||
name: str
|
||||
gacha_type: str
|
||||
item_type: str
|
||||
rank_type: str
|
||||
time: datetime.datetime
|
||||
|
||||
@validator("name")
|
||||
def name_validator(cls, v):
|
||||
if item_id := (roleToId(v) or weaponToId(v)):
|
||||
if item_id not in not_real_roles:
|
||||
return v
|
||||
raise ValueError("Invalid name")
|
||||
|
||||
@validator("gacha_type")
|
||||
def check_gacha_type(cls, v):
|
||||
if v not in {"100", "200", "301", "302", "400"}:
|
||||
raise ValueError("gacha_type must be 200, 301, 302 or 400")
|
||||
return v
|
||||
|
||||
@validator("item_type")
|
||||
def check_item_type(cls, item):
|
||||
if item not in {"角色", "武器"}:
|
||||
raise ValueError("error item type")
|
||||
return item
|
||||
|
||||
@validator("rank_type")
|
||||
def check_rank_type(cls, rank):
|
||||
if rank not in {"5", "4", "3"}:
|
||||
raise ValueError("error rank type")
|
||||
return rank
|
||||
|
||||
|
||||
class GachaLogInfo(BaseModel):
|
||||
user_id: str
|
||||
uid: str
|
||||
update_time: datetime.datetime
|
||||
import_type: str = ""
|
||||
item_list: Dict[str, List[GachaItem]] = {
|
||||
"角色祈愿": [],
|
||||
"武器祈愿": [],
|
||||
"常驻祈愿": [],
|
||||
"新手祈愿": [],
|
||||
}
|
||||
|
||||
@property
|
||||
def get_import_type(self) -> ImportType:
|
||||
try:
|
||||
return ImportType(self.import_type)
|
||||
except ValueError:
|
||||
return ImportType.UNKNOWN
|
||||
|
||||
|
||||
class Pool:
|
||||
def __init__(self, five: List[str], four: List[str], name: str, to: str, **kwargs):
|
||||
self.five = five
|
||||
self.real_name = name
|
||||
self.name = "、".join(self.five)
|
||||
self.four = four
|
||||
self.from_ = kwargs.get("from")
|
||||
self.to = to
|
||||
self.from_time = datetime.datetime.strptime(self.from_, "%Y-%m-%d %H:%M:%S")
|
||||
self.to_time = datetime.datetime.strptime(self.to, "%Y-%m-%d %H:%M:%S")
|
||||
self.start = self.from_time
|
||||
self.start_init = False
|
||||
self.end = self.to_time
|
||||
self.dict = {}
|
||||
self.count = 0
|
||||
|
||||
def parse(self, item: Union[FiveStarItem, FourStarItem]):
|
||||
if self.from_time <= item.time <= self.to_time:
|
||||
if self.dict.get(item.name):
|
||||
self.dict[item.name]["count"] += 1
|
||||
else:
|
||||
self.dict[item.name] = {
|
||||
"name": item.name,
|
||||
"icon": item.icon,
|
||||
"count": 1,
|
||||
"rank_type": 5 if isinstance(item, FiveStarItem) else 4,
|
||||
}
|
||||
|
||||
def count_item(self, item: List[GachaItem]):
|
||||
for i in item:
|
||||
if self.from_time <= i.time <= self.to_time:
|
||||
self.count += 1
|
||||
if not self.start_init:
|
||||
self.start = i.time
|
||||
self.end = i.time
|
||||
|
||||
def to_list(self):
|
||||
return list(self.dict.values())
|
||||
|
||||
|
||||
class ItemType(Enum):
|
||||
CHARACTER = "角色"
|
||||
WEAPON = "武器"
|
||||
|
||||
|
||||
class UIGFGachaType(Enum):
|
||||
BEGINNER = "100"
|
||||
STANDARD = "200"
|
||||
CHARACTER = "301"
|
||||
WEAPON = "302"
|
||||
CHARACTER2 = "400"
|
||||
|
||||
|
||||
class UIGFItem(BaseModel):
|
||||
id: str
|
||||
name: str
|
||||
count: str = "1"
|
||||
gacha_type: UIGFGachaType
|
||||
item_id: str = ""
|
||||
item_type: ItemType
|
||||
rank_type: str
|
||||
time: str
|
||||
uigf_gacha_type: UIGFGachaType
|
||||
|
||||
|
||||
class UIGFInfo(BaseModel):
|
||||
uid: str = "0"
|
||||
lang: str = "zh-cn"
|
||||
export_time: str = ""
|
||||
export_timestamp: int = 0
|
||||
export_app: str = ""
|
||||
export_app_version: str = ""
|
||||
uigf_version: str = UIGF_VERSION
|
||||
|
||||
def __init__(self, **data: Any):
|
||||
super().__init__(**data)
|
||||
if not self.export_time:
|
||||
self.export_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
self.export_timestamp = int(datetime.datetime.now().timestamp())
|
||||
|
||||
|
||||
class UIGFModel(BaseModel):
|
||||
info: UIGFInfo
|
||||
list: List[UIGFItem]
|
@ -1,13 +1,10 @@
|
||||
import json
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from io import BytesIO
|
||||
from os import sep
|
||||
|
||||
import genshin
|
||||
from aiofiles import open as async_open
|
||||
from genshin.models import BannerType
|
||||
from modules.apihelper.gacha_log import GachaLog as GachaLogService
|
||||
from openpyxl import load_workbook
|
||||
from telegram import Update, User, Message, Document, InlineKeyboardButton, InlineKeyboardMarkup
|
||||
from telegram.constants import ChatAction
|
||||
from telegram.ext import CallbackContext, CommandHandler, MessageHandler, filters, ConversationHandler
|
||||
@ -20,8 +17,20 @@ from core.plugin import Plugin, handler, conversation
|
||||
from core.template import TemplateService
|
||||
from core.user import UserService
|
||||
from core.user.error import UserNotFoundError
|
||||
from metadata.scripts.paimon_moe import update_paimon_moe_zh
|
||||
from modules.apihelper.hyperion import SignIn
|
||||
from modules.gacha_log.error import (
|
||||
GachaLogInvalidAuthkey,
|
||||
PaimonMoeGachaLogFileError,
|
||||
GachaLogFileError,
|
||||
GachaLogNotFound,
|
||||
GachaLogAccountNotFound,
|
||||
GachaLogMixedProvider,
|
||||
)
|
||||
from modules.gacha_log.helpers import from_url_get_authkey
|
||||
from modules.gacha_log.log import GachaLog
|
||||
from utils.bot import get_all_args
|
||||
from utils.const import PROJECT_ROOT
|
||||
from utils.decorators.admins import bot_admins_rights_check
|
||||
from utils.decorators.error import error_callable
|
||||
from utils.decorators.restricts import restricts
|
||||
@ -29,10 +38,13 @@ from utils.helpers import get_genshin_client
|
||||
from utils.log import logger
|
||||
from utils.models.base import RegionEnum
|
||||
|
||||
GACHA_LOG_PATH = PROJECT_ROOT.joinpath("data", "apihelper", "gacha_log")
|
||||
GACHA_LOG_PAIMON_MOE_PATH = PROJECT_ROOT.joinpath("metadata/data/paimon_moe_zh.json")
|
||||
GACHA_LOG_PATH.mkdir(parents=True, exist_ok=True)
|
||||
INPUT_URL, INPUT_FILE, CONFIRM_DELETE = range(10100, 10103)
|
||||
|
||||
|
||||
class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
class GachaLogPlugin(Plugin.Conversation, BasePlugin.Conversation):
|
||||
"""抽卡记录导入/导出/分析"""
|
||||
|
||||
def __init__(
|
||||
@ -46,20 +58,17 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
self.user_service = user_service
|
||||
self.assets_service = assets
|
||||
self.cookie_service = cookie_service
|
||||
self.zh_dict = None
|
||||
self.gacha_log = GachaLog(GACHA_LOG_PATH)
|
||||
|
||||
@staticmethod
|
||||
def from_url_get_authkey(url: str) -> str:
|
||||
"""从 UEL 解析 authkey
|
||||
:param url: URL
|
||||
:return: authkey
|
||||
"""
|
||||
try:
|
||||
return url.split("authkey=")[1].split("&")[0]
|
||||
except IndexError:
|
||||
return url
|
||||
async def __async_init__(self):
|
||||
await update_paimon_moe_zh(False)
|
||||
async with async_open(GACHA_LOG_PAIMON_MOE_PATH, "r", encoding="utf-8") as load_f:
|
||||
self.zh_dict = json.loads(await load_f.read())
|
||||
|
||||
@staticmethod
|
||||
async def _refresh_user_data(user: User, data: dict = None, authkey: str = None, verify_uid: bool = True) -> str:
|
||||
async def _refresh_user_data(
|
||||
self, user: User, data: dict = None, authkey: str = None, verify_uid: bool = True
|
||||
) -> str:
|
||||
"""刷新用户数据
|
||||
:param user: 用户
|
||||
:param data: 数据
|
||||
@ -70,153 +79,25 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
logger.debug("尝试获取已绑定的原神账号")
|
||||
client = await get_genshin_client(user.id, need_cookie=False)
|
||||
if authkey:
|
||||
return await GachaLogService.get_gacha_log_data(user.id, client, authkey)
|
||||
new_num = await self.gacha_log.get_gacha_log_data(user.id, client, authkey)
|
||||
return "更新完成,本次没有新增数据" if new_num == 0 else f"更新完成,本次共新增{new_num}条抽卡记录"
|
||||
if data:
|
||||
return await GachaLogService.import_gacha_log_data(user.id, client, data, verify_uid)
|
||||
new_num = await self.gacha_log.import_gacha_log_data(user.id, client, data, verify_uid)
|
||||
return "更新完成,本次没有新增数据" if new_num == 0 else f"更新完成,本次共新增{new_num}条抽卡记录"
|
||||
except GachaLogNotFound:
|
||||
return "派蒙没有找到你的抽卡记录,快来私聊派蒙导入吧~"
|
||||
except GachaLogAccountNotFound:
|
||||
return "导入失败,可能文件包含的祈愿记录所属 uid 与你当前绑定的 uid 不同"
|
||||
except GachaLogFileError:
|
||||
return "导入失败,数据格式错误"
|
||||
except GachaLogInvalidAuthkey:
|
||||
return "更新数据失败,authkey 无效"
|
||||
except GachaLogMixedProvider:
|
||||
return "导入失败,你已经通过其他方式导入过抽卡记录了,本次无法导入"
|
||||
except UserNotFoundError:
|
||||
logger.info(f"未查询到用户({user.full_name} {user.id}) 所绑定的账号信息")
|
||||
return "派蒙没有找到您所绑定的账号信息,请先私聊派蒙绑定账号"
|
||||
|
||||
@staticmethod
|
||||
def convert_paimonmoe_to_uigf(data: BytesIO) -> dict:
|
||||
"""转换 paimone.moe 或 非小酋 导出 xlsx 数据为 UIGF 格式
|
||||
:param data: paimon.moe 导出的 xlsx 数据
|
||||
:return: UIGF 格式数据
|
||||
"""
|
||||
PAIMONMOE_VERSION = 3
|
||||
PM2UIGF_VERSION = 1
|
||||
PM2UIGF_NAME = "paimon_moe_to_uigf"
|
||||
UIGF_VERSION = "v2.2"
|
||||
|
||||
with open(f"resources{sep}json{sep}zh.json", "r") as load_f:
|
||||
zh_dict = json.load(load_f)
|
||||
|
||||
class XlsxType(Enum):
|
||||
PAIMONMOE = 1
|
||||
FXQ = 2
|
||||
|
||||
class ItemType(Enum):
|
||||
CHARACTER = "角色"
|
||||
WEAPON = "武器"
|
||||
|
||||
class UIGFGachaType(Enum):
|
||||
BEGINNER = 100
|
||||
STANDARD = 200
|
||||
CHARACTER = 301
|
||||
WEAPON = 302
|
||||
|
||||
class Qiyr:
|
||||
def __init__(
|
||||
self, uigf_gacha_type: UIGFGachaType, item_type: ItemType, name: str, time: datetime, p: int, _id: int
|
||||
) -> None:
|
||||
self.uigf_gacha_type = uigf_gacha_type
|
||||
self.item_type = item_type
|
||||
self.name = name
|
||||
self.time = time
|
||||
self.rank_type = p
|
||||
self.id = _id
|
||||
|
||||
def qy2_json(self):
|
||||
return {
|
||||
"gacha_type": self.uigf_gacha_type.value, # 注意!
|
||||
"item_id": "",
|
||||
"count": -1,
|
||||
"time": self.time.strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"name": self.name,
|
||||
"item_type": self.item_type.value,
|
||||
"rank_type": self.rank_type,
|
||||
"id": self.id,
|
||||
"uigf_gacha_type": self.uigf_gacha_type.value,
|
||||
}
|
||||
|
||||
def from_paimon_moe(uigf_gacha_type: UIGFGachaType, item_type: str, name: str, time: str, p: int) -> Qiyr:
|
||||
item_type = ItemType.CHARACTER if item_type == "Character" else ItemType.WEAPON
|
||||
name = zh_dict[name]
|
||||
|
||||
time = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
|
||||
return Qiyr(uigf_gacha_type, item_type, name, time, p, 0)
|
||||
|
||||
def from_fxq(uigf_gacha_type: UIGFGachaType, item_type: str, name: str, time: str, p: int, _id: int) -> Qiyr:
|
||||
item_type = ItemType.CHARACTER if item_type == "角色" else ItemType.WEAPON
|
||||
time = datetime.strptime(time, "%Y-%m-%d %H:%M:%S")
|
||||
return Qiyr(uigf_gacha_type, item_type, name, time, p, _id)
|
||||
|
||||
class uigf:
|
||||
qiyes: list[Qiyr]
|
||||
uid: int
|
||||
export_time: datetime
|
||||
export_app: str = PM2UIGF_NAME
|
||||
export_app_version: str = PM2UIGF_VERSION
|
||||
uigf_version = UIGF_VERSION
|
||||
lang = "zh-cn"
|
||||
|
||||
def __init__(self, qiyes: list[Qiyr], uid: int, export_time: datetime) -> None:
|
||||
self.uid = uid
|
||||
self.qiyes = qiyes
|
||||
self.qiyes.sort(key=lambda x: x.time)
|
||||
if self.qiyes[0].id == 0: # 如果是从paimon.moe导入的,那么就给id赋值
|
||||
for index, _ in enumerate(self.qiyes):
|
||||
self.qiyes[index].id = index + 1
|
||||
self.export_time = export_time
|
||||
self.export_time = export_time
|
||||
|
||||
def export_json(self) -> dict:
|
||||
json_d = {
|
||||
"info": {
|
||||
"uid": self.uid,
|
||||
"lang": self.lang,
|
||||
"export_time": self.export_time.strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"export_timestamp": self.export_time.timestamp(),
|
||||
"export_app": self.export_app,
|
||||
"export_app_version": self.export_app_version,
|
||||
"uigf_version": self.uigf_version,
|
||||
},
|
||||
"list": [],
|
||||
}
|
||||
for qiye in self.qiyes:
|
||||
json_d["list"].append(qiye.qy2_json())
|
||||
return json_d
|
||||
|
||||
wb = load_workbook(data)
|
||||
|
||||
xlsx_type = XlsxType.PAIMONMOE if len(wb.worksheets) == 6 else XlsxType.FXQ # 判断是paimon.moe还是非小酋导出的
|
||||
|
||||
paimonmoe_sheets = {
|
||||
UIGFGachaType.BEGINNER: "Beginners' Wish",
|
||||
UIGFGachaType.STANDARD: "Standard",
|
||||
UIGFGachaType.CHARACTER: "Character Event",
|
||||
UIGFGachaType.WEAPON: "Weapon Event",
|
||||
}
|
||||
fxq_sheets = {
|
||||
UIGFGachaType.BEGINNER: "新手祈愿",
|
||||
UIGFGachaType.STANDARD: "常驻祈愿",
|
||||
UIGFGachaType.CHARACTER: "角色活动祈愿",
|
||||
UIGFGachaType.WEAPON: "武器活动祈愿",
|
||||
}
|
||||
qiyes = []
|
||||
if xlsx_type == XlsxType.PAIMONMOE:
|
||||
ws = wb["Information"]
|
||||
if ws["B2"].value != PAIMONMOE_VERSION:
|
||||
raise Exception("PaimonMoe version not supported")
|
||||
export_time = datetime.strptime(ws["B3"].value, "%Y-%m-%d %H:%M:%S")
|
||||
for gacha_type in paimonmoe_sheets:
|
||||
ws = wb[paimonmoe_sheets[gacha_type]]
|
||||
for row in ws.iter_rows(min_row=2, values_only=True):
|
||||
if row[0] is None:
|
||||
break
|
||||
qiyes.append(from_paimon_moe(gacha_type, row[0], row[1], row[2], row[3]))
|
||||
else:
|
||||
export_time = datetime.now()
|
||||
for gacha_type in fxq_sheets:
|
||||
ws = wb[fxq_sheets[gacha_type]]
|
||||
for row in ws.iter_rows(min_row=2, values_only=True):
|
||||
if row[0] is None:
|
||||
break
|
||||
qiyes.append(from_fxq(gacha_type, row[2], row[1], row[0], row[3], row[6]))
|
||||
|
||||
u = uigf(qiyes, 0, export_time)
|
||||
return u.export_json()
|
||||
|
||||
async def import_from_file(self, user: User, message: Message, document: Document = None) -> None:
|
||||
if not document:
|
||||
document = message.document
|
||||
@ -226,9 +107,11 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
elif document.file_name.endswith(".json"):
|
||||
file_type = "json"
|
||||
else:
|
||||
await message.reply_text("文件格式错误,请发送符合 UIGF 标准的 json 格式的抽卡记录文件或者 paimon.moe、非小酋导出的 xlsx 格式的抽卡记录文件")
|
||||
await message.reply_text("文件格式错误,请发送符合 UIGF 标准的抽卡记录文件或者 paimon.moe、非小酋导出的 xlsx 格式的抽卡记录文件")
|
||||
return
|
||||
if document.file_size > 2 * 1024 * 1024:
|
||||
await message.reply_text("文件过大,请发送小于 2 MB 的文件")
|
||||
return
|
||||
try:
|
||||
data = BytesIO()
|
||||
await (await document.get_file()).download(out=data)
|
||||
@ -236,9 +119,17 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
# bytesio to json
|
||||
data = data.getvalue().decode("utf-8")
|
||||
data = json.loads(data)
|
||||
else:
|
||||
data = self.convert_paimonmoe_to_uigf(data)
|
||||
except UnicodeDecodeError:
|
||||
elif file_type == "xlsx":
|
||||
data = self.gacha_log.convert_xlsx_to_uigf(data, self.zh_dict)
|
||||
except PaimonMoeGachaLogFileError as exc:
|
||||
await message.reply_text(
|
||||
"导入失败,PaimonMoe的抽卡记录当前版本不支持\n" f"支持抽卡记录的版本为 {exc.support_version},你的抽卡记录版本为 {exc.file_version}"
|
||||
)
|
||||
return
|
||||
except GachaLogFileError:
|
||||
await message.reply_text("文件解析失败,请检查文件是否符合 UIGF 标准")
|
||||
return
|
||||
except (KeyError, IndexError, ValueError):
|
||||
await message.reply_text("文件解析失败,请检查文件编码是否正确或符合 UIGF 标准")
|
||||
return
|
||||
except Exception as exc:
|
||||
@ -265,7 +156,7 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
user = update.effective_user
|
||||
args = get_all_args(context)
|
||||
logger.info(f"用户 {user.full_name}[{user.id}] 导入抽卡记录命令请求")
|
||||
authkey = self.from_url_get_authkey(args[0] if args else "")
|
||||
authkey = from_url_get_authkey(args[0] if args else "")
|
||||
if not args:
|
||||
if message.document:
|
||||
await self.import_from_file(user, message)
|
||||
@ -299,7 +190,8 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
await message.reply_text(
|
||||
"<b>开始导入祈愿历史记录:请通过 https://paimon.moe/wish/import 获取抽卡记录链接后发送给我"
|
||||
"(非 paimon.moe 导出的文件数据)</b>\n\n"
|
||||
"> 你还可以向派蒙发送从其他工具导出的 UIGF JSON 标准的记录文件\n"
|
||||
"> 你还可以向派蒙发送从其他工具导出的 UIGF 标准的记录文件\n"
|
||||
"> 或者从 paimon.moe 、非小酋 导出的 xlsx 记录文件\n"
|
||||
"> 在绑定 Cookie 时添加 stoken 可能有特殊效果哦(仅限国服)\n"
|
||||
"<b>注意:导入的数据将会与旧数据进行合并。</b>",
|
||||
parse_mode="html",
|
||||
@ -324,7 +216,7 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
if message.document:
|
||||
await self.import_from_file(user, message)
|
||||
return ConversationHandler.END
|
||||
authkey = self.from_url_get_authkey(message.text)
|
||||
authkey = from_url_get_authkey(message.text)
|
||||
reply = await message.reply_text("小派蒙正在从米哈游服务器获取数据,请稍后")
|
||||
await message.reply_chat_action(ChatAction.TYPING)
|
||||
text = await self._refresh_user_data(user, authkey=authkey)
|
||||
@ -346,7 +238,7 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
except UserNotFoundError:
|
||||
await message.reply_text("你还没有导入抽卡记录哦~")
|
||||
return ConversationHandler.END
|
||||
_, status = await GachaLogService.load_history_info(str(user.id), str(client.uid), only_status=True)
|
||||
_, status = await self.gacha_log.load_history_info(str(user.id), str(client.uid), only_status=True)
|
||||
if not status:
|
||||
await message.reply_text("你还没有导入抽卡记录哦~")
|
||||
return ConversationHandler.END
|
||||
@ -361,7 +253,7 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
message = update.effective_message
|
||||
user = update.effective_user
|
||||
if message.text == "确定":
|
||||
status = await GachaLogService.remove_history_info(str(user.id), str(context.chat_data["uid"]))
|
||||
status = await self.gacha_log.remove_history_info(str(user.id), str(context.chat_data["uid"]))
|
||||
await message.reply_text("抽卡记录已删除" if status else "抽卡记录删除失败")
|
||||
return ConversationHandler.END
|
||||
await message.reply_text("已取消")
|
||||
@ -380,12 +272,18 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
if cid < 0:
|
||||
raise ValueError("Invalid cid")
|
||||
client = await get_genshin_client(cid, need_cookie=False)
|
||||
_, status = await GachaLogService.load_history_info(str(cid), str(client.uid), only_status=True)
|
||||
_, status = await self.gacha_log.load_history_info(str(cid), str(client.uid), only_status=True)
|
||||
if not status:
|
||||
await message.reply_text("该用户还没有导入抽卡记录")
|
||||
return
|
||||
status = await GachaLogService.remove_history_info(str(cid), str(client.uid))
|
||||
status = await self.gacha_log.remove_history_info(str(cid), str(client.uid))
|
||||
await message.reply_text("抽卡记录已强制删除" if status else "抽卡记录删除失败")
|
||||
except GachaLogNotFound:
|
||||
await message.reply_text("派蒙没有找到你的抽卡记录,快来私聊派蒙导入吧~")
|
||||
except GachaLogAccountNotFound:
|
||||
await message.reply_text("导入失败,可能文件包含的祈愿记录所属 uid 与你当前绑定的 uid 不同")
|
||||
except GachaLogFileError:
|
||||
await message.reply_text("导入失败,数据格式错误")
|
||||
except UserNotFoundError:
|
||||
await message.reply_text("该用户暂未绑定账号")
|
||||
except (ValueError, IndexError):
|
||||
@ -402,12 +300,15 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
try:
|
||||
client = await get_genshin_client(user.id, need_cookie=False)
|
||||
await message.reply_chat_action(ChatAction.TYPING)
|
||||
state, text, path = await GachaLogService.gacha_log_to_uigf(str(user.id), str(client.uid))
|
||||
if state:
|
||||
await message.reply_chat_action(ChatAction.UPLOAD_DOCUMENT)
|
||||
await message.reply_document(document=open(path, "rb+"), caption="抽卡记录导出文件")
|
||||
else:
|
||||
await message.reply_text(text)
|
||||
path = await self.gacha_log.gacha_log_to_uigf(str(user.id), str(client.uid))
|
||||
await message.reply_chat_action(ChatAction.UPLOAD_DOCUMENT)
|
||||
await message.reply_document(document=open(path, "rb+"), caption="抽卡记录导出文件 - UIGF V2.2")
|
||||
except GachaLogNotFound:
|
||||
await message.reply_text("派蒙没有找到你的抽卡记录,快来私聊派蒙导入吧~")
|
||||
except GachaLogAccountNotFound:
|
||||
await message.reply_text("导入失败,可能文件包含的祈愿记录所属 uid 与你当前绑定的 uid 不同")
|
||||
except GachaLogFileError:
|
||||
await message.reply_text("导入失败,数据格式错误")
|
||||
except UserNotFoundError:
|
||||
logger.info(f"未查询到用户({user.full_name} {user.id}) 所绑定的账号信息")
|
||||
if filters.ChatType.GROUPS.filter(message):
|
||||
@ -433,7 +334,7 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
try:
|
||||
client = await get_genshin_client(user.id, need_cookie=False)
|
||||
await message.reply_chat_action(ChatAction.TYPING)
|
||||
data = await GachaLogService.get_analysis(user.id, client, pool_type, self.assets_service)
|
||||
data = await self.gacha_log.get_analysis(user.id, client, pool_type, self.assets_service)
|
||||
if isinstance(data, str):
|
||||
reply_message = await message.reply_text(data)
|
||||
if filters.ChatType.GROUPS.filter(message):
|
||||
@ -445,6 +346,12 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
"genshin/gacha_log/gacha_log.html", data, full_page=True, query_selector=".body_box"
|
||||
)
|
||||
await message.reply_photo(png_data)
|
||||
except GachaLogNotFound:
|
||||
await message.reply_text("派蒙没有找到你的抽卡记录,快来私聊派蒙导入吧~")
|
||||
except GachaLogAccountNotFound:
|
||||
await message.reply_text("导入失败,可能文件包含的祈愿记录所属 uid 与你当前绑定的 uid 不同")
|
||||
except GachaLogFileError:
|
||||
await message.reply_text("导入失败,数据格式错误")
|
||||
except UserNotFoundError:
|
||||
logger.info(f"未查询到用户({user.full_name} {user.id}) 所绑定的账号信息")
|
||||
if filters.ChatType.GROUPS.filter(message):
|
||||
@ -476,7 +383,7 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
client = await get_genshin_client(user.id, need_cookie=False)
|
||||
group = filters.ChatType.GROUPS.filter(message)
|
||||
await message.reply_chat_action(ChatAction.TYPING)
|
||||
data = await GachaLogService.get_pool_analysis(user.id, client, pool_type, self.assets_service, group)
|
||||
data = await self.gacha_log.get_pool_analysis(user.id, client, pool_type, self.assets_service, group)
|
||||
if isinstance(data, str):
|
||||
reply_message = await message.reply_text(data)
|
||||
if filters.ChatType.GROUPS.filter(message):
|
||||
@ -496,6 +403,12 @@ class GachaLog(Plugin.Conversation, BasePlugin.Conversation):
|
||||
else:
|
||||
await message.reply_chat_action(ChatAction.UPLOAD_PHOTO)
|
||||
await message.reply_photo(png_data)
|
||||
except GachaLogNotFound:
|
||||
await message.reply_text("派蒙没有找到你的抽卡记录,快来私聊派蒙导入吧~")
|
||||
except GachaLogAccountNotFound:
|
||||
await message.reply_text("导入失败,可能文件包含的祈愿记录所属 uid 与你当前绑定的 uid 不同")
|
||||
except GachaLogFileError:
|
||||
await message.reply_text("导入失败,数据格式错误")
|
||||
except (UserNotFoundError, CookiesNotFoundError):
|
||||
logger.info(f"未查询到用户({user.full_name} {user.id}) 所绑定的账号信息")
|
||||
if filters.ChatType.GROUPS.filter(message):
|
||||
|
@ -11,7 +11,7 @@ from core.plugin import Plugin, handler
|
||||
from core.sign import SignServices
|
||||
from core.user import UserService
|
||||
from core.user.error import UserNotFoundError
|
||||
from modules.apihelper.gacha_log import GachaLog
|
||||
from modules.gacha_log.log import GachaLog
|
||||
from utils.bot import get_all_args
|
||||
from utils.decorators.admins import bot_admins_rights_check
|
||||
from utils.helpers import get_genshin_client
|
||||
|
@ -3,6 +3,7 @@ from telegram import Update
|
||||
from core.plugin import Plugin, handler
|
||||
from metadata.scripts.honey import update_honey_metadata
|
||||
from metadata.scripts.metadatas import update_metadata_from_ambr, update_metadata_from_github
|
||||
from metadata.scripts.paimon_moe import update_paimon_moe_zh
|
||||
from utils.decorators.admins import bot_admins_rights_check
|
||||
from utils.log import logger
|
||||
|
||||
@ -19,6 +20,7 @@ class MetadataPlugin(Plugin):
|
||||
msg = await message.reply_text("正在刷新元数据,请耐心等待...")
|
||||
logger.info("正在从 github 上获取元数据")
|
||||
await update_metadata_from_github()
|
||||
await update_paimon_moe_zh()
|
||||
logger.info("正在从 ambr 上获取元数据")
|
||||
await update_metadata_from_ambr()
|
||||
logger.info("正在从 honey 上获取元数据")
|
||||
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue
Block a user