支持 MiYouShe Splash

This commit is contained in:
xtaodada 2023-06-23 01:17:14 +08:00
parent c1408b1a31
commit c90c7c41c2
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
7 changed files with 263 additions and 0 deletions

View File

@ -17,6 +17,8 @@ ipv6 = False
admin = 0
lofter_channel = 0
lofter_channel_username = username
splash_channel = 0
splash_channel_username = username
[api]
amap_key = ABCD

View File

@ -11,6 +11,8 @@ ipv6: Union[bool, str] = "False"
admin: int = 0
lofter_channel: int = 0
lofter_channel_username: str = ""
splash_channel: int = 0
splash_channel_username: str = ""
# [api]
amap_key: str = ""
bili_cookie: str = ""
@ -24,6 +26,10 @@ lofter_channel = config.getint("post", "lofter_channel", fallback=lofter_channel
lofter_channel_username = config.get(
"post", "lofter_channel_username", fallback=lofter_channel_username
)
splash_channel = config.getint("post", "splash_channel", fallback=splash_channel)
splash_channel_username = config.get(
"post", "splash_channel_username", fallback=splash_channel_username
)
amap_key = config.get("api", "amap_key", fallback=amap_key)
bili_cookie = config.get("api", "bili_cookie", fallback=bili_cookie)
try:

115
defs/splash.py Normal file
View File

@ -0,0 +1,115 @@
import asyncio
import traceback
from typing import List
from pyrogram.enums import ParseMode
from pyrogram.errors import FloodWait
from defs.glover import splash_channel
from init import bot, request, logger
from models.models.splash import Splash as SplashModel
from models.apis.splash import Splash as SplashApi
from models.splash import SplashService
async def get_splash() -> List[SplashApi]:
data = await request.get("https://bbs-api.miyoushe.com/apihub/api/getAppSplash")
splash_list = []
if data.is_success:
data = data.json()
for i in data["data"]["splashes"]:
splash_list.append(SplashApi(**i))
return splash_list
async def check_splash(splash: SplashApi) -> bool:
if data := await SplashService.get_by_splash_id(splash.id):
if splash.splash_image:
data.splash_image = splash.splash_image
if splash.online_ts:
data.online_ts = splash.online_ts
if splash.offline_ts:
data.offline_ts = splash.offline_ts
if splash.article_url:
data.article_url = splash.article_url
await SplashService.update_splash(data)
return False
return True
def gen_splash(splash: SplashApi) -> SplashModel:
return SplashModel(
id=splash.id,
splash_image=splash.splash_image,
online_ts=splash.online_ts,
offline_ts=splash.offline_ts,
file_id=splash.file_id,
article_url=splash.article_url,
)
def retry(func):
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except FloodWait as e:
logger.warning(f"Sleeping for {e.value}s")
await asyncio.sleep(e.value + 1)
return await func(*args, **kwargs)
return wrapper
@retry
async def send_splash_text(api: SplashApi):
await bot.send_message(
splash_channel,
api.text,
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
)
@retry
async def send_splash_photo(model: SplashModel):
photo = await bot.send_photo(
splash_channel,
model.splash_image,
)
model.file_id = photo.photo.file_id
@retry
async def send_splash_document(model: SplashModel):
await bot.send_document(
splash_channel,
model.splash_image,
force_document=True,
)
async def send_splash(api: SplashApi, model: SplashModel):
await send_splash_text(api)
await send_splash_photo(model)
await send_splash_document(model)
async def update_splash():
logger.info("Updating splash ...")
try:
data = await get_splash()
except Exception:
traceback.print_exc()
return
for i in data:
if not await check_splash(i):
continue
model = gen_splash(i)
if not model.splash_image:
continue
try:
await send_splash(i, model)
except Exception:
traceback.print_exc()
continue
await SplashService.add_splash(model)
logger.info("Splash updated.")

66
models/apis/splash.py Normal file
View File

@ -0,0 +1,66 @@
import re
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
GAME_ID_MAP = {1: "bh3", 2: "ys", 3: "bh2", 4: "wd", 5: "dby", 6: "sr", 8: "zzz"}
class Splash(BaseModel):
id: int
splash_image: str
app_path: str
online_ts: int
offline_ts: int
game_id: Optional[int] = 0
file_id: Optional[str] = ""
@property
def online_time(self) -> datetime:
return datetime.fromtimestamp(int(self.online_ts))
@property
def online_time_str(self) -> str:
return self.online_time.strftime("%Y-%m-%d %H:%M:%S")
@property
def offline_time(self) -> datetime:
return datetime.fromtimestamp(int(self.offline_ts))
@property
def offline_time_str(self) -> str:
return self.offline_time.strftime("%Y-%m-%d %H:%M:%S")
@property
def article_id(self) -> int:
try:
return int(re.search(r"article/(\d+)", self.app_path).group(1))
except AttributeError:
return 0
@property
def game_short_name(self) -> str:
if not self.game_id:
return ""
return GAME_ID_MAP.get(self.game_id)
@property
def article_url(self) -> str:
if self.app_path.startswith("http"):
return self.app_path
if not self.article_id:
return ""
if not self.game_short_name:
return ""
return f"https://www.miyoushe.com/{self.game_short_name}/article/{self.article_id}"
@property
def text(self) -> str:
return f"#id{self.id} \n" \
f"ID<code>{self.id}</code>\n" \
f"所属分区:<code>{self.game_id} - {self.game_short_name}</code>\n" \
f"开始时间:<code>{self.online_time_str}</code>\n" \
f"结束时间:<code>{self.offline_time_str}</code>\n" \
f"链接: {self.splash_image}\n" \
f"文章链接: {self.article_url}"

12
models/models/splash.py Normal file
View File

@ -0,0 +1,12 @@
from sqlmodel import SQLModel, Field
class Splash(SQLModel, table=True):
__table_args__ = dict(mysql_charset="utf8mb4", mysql_collate="utf8mb4_general_ci")
id: int = Field(primary_key=True)
splash_image: str = Field()
online_ts: int = Field(default=0)
offline_ts: int = Field(default=0)
file_id: str = Field()
article_url: str = Field(default="")

41
models/splash.py Normal file
View File

@ -0,0 +1,41 @@
from typing import cast, Optional, List
from sqlalchemy import select
from sqlmodel.ext.asyncio.session import AsyncSession
from init import sqlite
from models.models.splash import Splash
class SplashService:
@staticmethod
async def get_by_splash_id(splash_id: int) -> Optional[Splash]:
async with sqlite.Session() as session:
session = cast(AsyncSession, session)
check = Splash.id == splash_id
statement = select(Splash).where(check)
results = await session.exec(statement)
return post[0] if (post := results.first()) else None
@staticmethod
async def get_all_splashes() -> List[Optional[Splash]]:
async with sqlite.Session() as session:
session = cast(AsyncSession, session)
statement = select(Splash)
results = await session.exec(statement)
return [item[0] for item in results.all()]
@staticmethod
async def add_splash(splash: Splash):
async with sqlite.Session() as session:
session = cast(AsyncSession, session)
session.add(splash)
await session.commit()
@staticmethod
async def update_splash(splash: Splash):
async with sqlite.Session() as session:
session = cast(AsyncSession, session)
session.add(splash)
await session.commit()
await session.refresh(splash)

21
modules/splash.py Normal file
View File

@ -0,0 +1,21 @@
from pyrogram import Client, filters
from pyrogram.types import Message
from defs.splash import update_splash
from scheduler import scheduler
@Client.on_message(
filters.incoming & filters.command(["splash_update"])
)
async def splash_update(_: Client, message: Message):
"""
更新 splash
"""
await update_splash()
await message.reply("更新成功", quote=True)
@scheduler.scheduled_job("interval", minutes=30, id="splash_update")
async def splash_update_job() -> None:
await update_splash()