支持查询 fanbox 发帖、用户

This commit is contained in:
xtaodada 2023-01-12 22:56:04 +08:00
parent 4a0ae1500d
commit 43bb361526
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
7 changed files with 270 additions and 6 deletions

View File

@ -11,7 +11,7 @@ from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
from defs.browser import get_browser
from headers import headers
from headers import bili_headers
def cut_text(old_str, cut):
@ -205,7 +205,7 @@ def binfo_image_create(video_info: dict):
up_mid = up["mid"]
up_data = httpx.get(
f"https://api.bilibili.com/x/space/acc/info?mid={up_mid}",
headers=headers,
headers=bili_headers,
).json()
up_list.append(
{
@ -223,11 +223,11 @@ def binfo_image_create(video_info: dict):
up_mid = video_info["data"]["owner"]["mid"]
up_data = httpx.get(
f"https://api.bilibili.com/x/space/acc/info?mid={up_mid}",
headers=headers,
headers=bili_headers,
).json()
up_stat = httpx.get(
f"https://api.bilibili.com/x/relation/stat?vmid={up_mid}",
headers=headers,
headers=bili_headers,
).json()
up_list = [
{

138
defs/fanbox.py Normal file
View File

@ -0,0 +1,138 @@
import re
from typing import Tuple, Optional, Union
from pyrogram.enums import ParseMode
from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
from headers import FANBOX_HEADERS
from models.apis.fanbox import User as FanboxUser, Post as FanboxPost
from init import request
FANBOX_USER_API = "https://api.fanbox.cc/creator.get"
FANBOX_POST_API = "https://api.fanbox.cc/post.info"
async def get_fanbox_user(username: str) -> FanboxUser:
params = {
"creatorId": username,
}
req = await request.get(FANBOX_USER_API, params=params, headers=FANBOX_HEADERS)
assert req.status_code == 200
return FanboxUser(**(req.json()["body"]))
async def get_fanbox_post(post_id: str) -> FanboxPost:
params = {
"postId": post_id,
}
req = await request.get(FANBOX_POST_API, params=params, headers=FANBOX_HEADERS)
assert req.status_code == 200
return FanboxPost(**(req.json()["body"]))
def parse_username_and_post(url: str) -> Tuple[Optional[str], Optional[str]]:
# https://www.fanbox.cc/@username/posts/post_id
username, post_id = None, None
if username_temp := re.findall(r"fanbox.cc/@(.+?)/", url):
username = username_temp[0]
elif username_temp := re.findall(r"//(.+?).fanbox.cc", url):
if username_temp[0] != "www":
username = username_temp[0]
for i in url.split("/posts/")[1:]:
if post_id_temp := re.findall(r"(\d+)", i):
post_id = post_id_temp[0]
break
return username, post_id
async def check_kemono_party(model: Union[FanboxPost, FanboxUser]) -> bool:
req = await request.get(model.kemono_url)
return req.status_code == 200
async def gen_post_button(post: FanboxPost) -> InlineKeyboardMarkup:
l1 = [
InlineKeyboardButton(text="Source", url=post.url),
InlineKeyboardButton(text="Author", url=post.user_url),
]
if post.coverImageUrl:
l1.insert(1, InlineKeyboardButton(text="Origin", url=post.coverImageUrl))
l2 = [
InlineKeyboardButton(text="Kemono", url=post.kemono_url),
]
data = [l1, l2] if await check_kemono_party(post) else [l1]
return InlineKeyboardMarkup(data)
async def gen_user_button(user: FanboxUser) -> InlineKeyboardMarkup:
l1 = [
InlineKeyboardButton(text="Author", url=user.url),
]
if user.coverImageUrl:
l1.insert(0, InlineKeyboardButton(text="Origin", url=user.coverImageUrl))
l2 = [
InlineKeyboardButton(text="Kemono", url=user.kemono_url),
]
data = [l1, l2] if await check_kemono_party(user) else [l1]
return InlineKeyboardMarkup(data)
async def parse_fanbox_post(url: str, message: Message):
_, post_id = parse_username_and_post(url)
if not post_id:
return
try:
post: FanboxPost = await get_fanbox_post(post_id)
except AssertionError:
return
if post.coverImageUrl:
await message.reply_photo(
post.coverImageUrl,
caption=post.text,
parse_mode=ParseMode.HTML,
reply_markup=await gen_post_button(post),
quote=True,
)
else:
await message.reply_text(
post.text,
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
reply_markup=await gen_post_button(post),
quote=True,
)
async def parse_fanbox_user(url: str, message: Message) -> None:
username, _ = parse_username_and_post(url)
if not username:
return
try:
user: FanboxUser = await get_fanbox_user(username)
except AssertionError:
return
if user.coverImageUrl:
await message.reply_photo(
user.coverImageUrl,
caption=user.text,
parse_mode=ParseMode.HTML,
reply_markup=await gen_user_button(user),
quote=True,
)
else:
await message.reply_text(
user.text,
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
reply_markup=await gen_user_button(user),
quote=True,
)
async def parse_fanbox_url(url: str, message: Message) -> None:
if "/posts/" in url:
await parse_fanbox_post(url, message)
else:
await parse_fanbox_user(url, message)

View File

@ -1,3 +1,20 @@
headers = {
bili_headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36"
}
FANBOX_HEADERS = {
"authority": "api.fanbox.cc",
"accept": "application/json, text/plain, */*",
"accept-language": "zh-CN,zh;q=0.9,zh-Hans;q=0.8,und;q=0.7,en;q=0.6,zh-Hant;q=0.5,ja;q=0.4",
"dnt": "1",
"origin": "https://www.fanbox.cc",
"referer": "https://www.fanbox.cc/",
"sec-ch-ua": '"Chromium";v="108", "Not?A_Brand";v="8"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-site",
"sec-gpc": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 "
"Safari/537.36",
}

View File

@ -4,9 +4,10 @@ from init import logs, user_me, bot, sqlite
if __name__ == "__main__":
logs.info(f"@{user_me.username} 运行成功!")
logs.info(f"@{user_me.username} 连接服务器中。。。")
bot.start()
bot.loop.create_task(sqlite.create_db_and_tables())
logs.info(f"@{user_me.username} 运行成功!")
idle()
bot.stop()
sqlite.stop()

83
models/apis/fanbox.py Normal file
View File

@ -0,0 +1,83 @@
import datetime
from typing import Optional
from pydantic import BaseModel
class LiteUser(BaseModel):
name: str
userId: str
iconUrl: Optional[str]
class User(BaseModel):
coverImageUrl: Optional[str]
creatorId: str
description: str
hasAdultContent: bool
user: LiteUser
...
@property
def url(self) -> str:
return f"https://{self.creatorId}.fanbox.cc"
@property
def kemono_url(self) -> str:
return f"https://kemono.party/fanbox/user/{self.user.userId}"
@property
def name(self) -> str:
return f"🔞 {self.user.name}" if self.hasAdultContent else self.user.name
@property
def text(self) -> str:
return (
f"<b>Fanbox User Info</b>\n\n"
f"Name: <code>{self.user.name}</code>\n"
f'Username: <a href="{self.url}">{self.creatorId}</a>\n'
f"Bio: <code>{self.description.strip()}</code>"
)
class Post(BaseModel):
id: str
coverImageUrl: Optional[str]
creatorId: str
excerpt: str
feeRequired: int
likeCount: int
publishedDatetime: str
title: str
user: LiteUser
...
@property
def url(self) -> str:
return f"{self.user_url}posts/{self.id}"
@property
def kemono_url(self) -> str:
return f"https://kemono.party/fanbox/user/{self.user.userId}/post/{self.id}"
@property
def user_url(self) -> str:
return f"https://{self.creatorId}.fanbox.cc/"
@property
def create_time(self) -> str:
# 2022-10-05T20:21:19+09:00
jp_time = datetime.datetime.strptime(
self.publishedDatetime, "%Y-%m-%dT%H:%M:%S%z"
)
cn_time = jp_time.astimezone(datetime.timezone(datetime.timedelta(hours=8)))
return cn_time.strftime("%Y-%m-%d %H:%M:%S")
@property
def text(self) -> str:
return (
f"<b>Fanbox Post Info</b>\n\n"
f"<code>{self.excerpt.strip()}</code>\n\n"
f'<a href="{self.user_url}">{self.user.name}</a> 发表于 {self.create_time}\n'
f"❤️ {self.likeCount}"
)

24
modules/fanbox.py Normal file
View File

@ -0,0 +1,24 @@
from pyrogram import Client, filters, ContinuePropagation
from pyrogram.enums import MessageEntityType
from pyrogram.types import Message
from defs.fanbox import parse_fanbox_url
@Client.on_message(filters.incoming & filters.text & filters.regex(r"fanbox.cc"))
async def fanbox_check(_: Client, message: Message):
if not message.text:
return
try:
for num in range(len(message.entities)):
entity = message.entities[num]
if entity.type == MessageEntityType.URL:
url = message.text[entity.offset : entity.offset + entity.length]
elif entity.type == MessageEntityType.TEXT_LINK:
url = entity.url
else:
continue
await parse_fanbox_url(url, message)
except Exception as e:
print(e)
raise ContinuePropagation

View File

@ -7,6 +7,7 @@ des = """本机器人特性:
解析 bilibili 视频动态
解析 twitter 推文用户
解析 lofter 日志用户
解析 fanbox 发帖用户
汇率查询
复读机3
答案之书