MibooGram/modules/gacha_log/online_view.py

82 lines
2.6 KiB
Python
Raw Normal View History

2024-09-08 15:11:32 +00:00
from pathlib import Path
from typing import Optional
import httpx
from httpx import URL
from httpx import HTTPError
from telegram import InlineKeyboardMarkup, InlineKeyboardButton
from telegram.helpers import create_deep_linked_url
2024-11-30 14:11:02 +00:00
from gram_core.basemodel import Settings, SettingsConfigDict
2024-09-08 15:11:32 +00:00
from modules.gacha_log.error import GachaLogWebNotConfigError, GachaLogWebUploadError, GachaLogNotFound
class GachaLogWebConfig(Settings):
"""调频记录在线查询配置"""
2024-09-08 15:11:32 +00:00
url: Optional[str] = ""
token: Optional[str] = ""
2024-11-30 14:11:02 +00:00
model_config = SettingsConfigDict(env_prefix="gacha_log_web_")
2024-09-08 15:11:32 +00:00
gacha_log_web_config = GachaLogWebConfig()
DEFAULT_POOL = "代理人调频"
class GachaLogOnlineView:
"""调频记录在线查询"""
2024-09-08 15:11:32 +00:00
gacha_log_path: Path
@staticmethod
def get_web_upload_button(bot_username: str):
if not gacha_log_web_config.url:
return None
return InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
">> 查询详细信息 <<", url=create_deep_linked_url(bot_username, "gacha_log_online_view")
)
]
]
)
async def web_upload(self, user_id: str, uid: str) -> str:
if not gacha_log_web_config.url:
raise GachaLogWebNotConfigError
file_path = self.gacha_log_path / f"{user_id}-{uid}.json"
if not file_path.exists():
raise GachaLogNotFound
async with httpx.AsyncClient() as client:
with open(file_path, "rb") as file:
try:
req = await client.post(
URL(gacha_log_web_config.url).join("upload"),
files={"file": file},
data={
"token": gacha_log_web_config.token,
"uid": uid,
"game": "zzz",
},
)
req.raise_for_status()
except HTTPError as e:
raise GachaLogWebUploadError from e
account_id = req.json()["account_id"]
url = (
URL(gacha_log_web_config.url)
.join("gacha_log")
.copy_merge_params(
{
"account_id": account_id,
"banner_type": DEFAULT_POOL,
"rarities": "3,4,5",
"size": 100,
"page": 1,
}
)
)
return str(url)