feat: parse bsky post

This commit is contained in:
xtaodada 2024-10-20 15:31:44 +08:00
parent cee97a4202
commit 807b4c05b9
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
8 changed files with 527 additions and 0 deletions

View File

@ -26,3 +26,7 @@ bili_cookie = ABCD
bili_auth_user = 777000,111000 bili_auth_user = 777000,111000
bili_auth_chat = 777000,111000 bili_auth_chat = 777000,111000
mys_cookie = ABCD mys_cookie = ABCD
[bsky]
username = 111
password = 11

188
defs/bsky.py Normal file
View File

@ -0,0 +1,188 @@
import asyncio
import traceback
from typing import Optional, TYPE_CHECKING
from atproto_client.exceptions import BadRequestError
from pyrogram.enums import ParseMode
from pyrogram.errors import FloodWait
from pyrogram.types import (
InlineKeyboardMarkup,
InlineKeyboardButton,
InputMediaPhoto,
Message,
)
from init import bot, logs
from models.models.bsky import HumanPost, HumanAuthor
from models.services.bsky import bsky_client
if TYPE_CHECKING:
from modules.bsky_api import Reply
def flood_wait():
def decorator(function):
async def wrapper(*args, **kwargs):
try:
return await function(*args, **kwargs)
except FloodWait as e:
logs.warning(f"遇到 FloodWait等待 {e.value} 秒后重试!")
await asyncio.sleep(e.value + 1)
return await wrapper(*args, **kwargs)
except Exception as e:
traceback.print_exc()
raise e
return wrapper
return decorator
class Timeline:
@staticmethod
def get_button(post: HumanPost) -> InlineKeyboardMarkup:
return InlineKeyboardMarkup(
[
[
InlineKeyboardButton("Source", url=post.url),
InlineKeyboardButton("Author", url=post.author.url),
]
]
)
@staticmethod
def get_author_button(author: HumanAuthor) -> InlineKeyboardMarkup:
return InlineKeyboardMarkup(
[
[
InlineKeyboardButton("Author", url=author.url),
]
]
)
@staticmethod
def get_media_group(text: str, post: HumanPost) -> list[InputMediaPhoto]:
data = []
images = post.images
for idx, image in enumerate(images):
data.append(
InputMediaPhoto(
image,
caption=text if idx == 0 else None,
parse_mode=ParseMode.HTML,
)
)
return data
@staticmethod
def get_post_text(post: HumanPost) -> str:
text = "<b>Bsky Post Info</b>\n\n<code>"
text += post.content
key = "发表"
if post.is_reply:
key = "回复"
elif post.is_quote:
key = "引用"
text += f"</code>\n\n{post.author.format} {key}{post.time_str}"
return text
@staticmethod
@flood_wait()
async def send_to_user(reply: "Reply", post: HumanPost):
text = Timeline.get_post_text(post)
if post.gif:
return await bot.send_animation(
reply.cid,
post.gif,
caption=text,
reply_to_message_id=reply.mid,
parse_mode=ParseMode.HTML,
reply_markup=Timeline.get_button(post),
)
elif not post.images:
return await bot.send_message(
reply.cid,
text,
disable_web_page_preview=True,
reply_to_message_id=reply.mid,
parse_mode=ParseMode.HTML,
reply_markup=Timeline.get_button(post),
)
elif len(post.images) == 1:
return await bot.send_photo(
reply.cid,
post.images[0],
caption=text,
reply_to_message_id=reply.mid,
parse_mode=ParseMode.HTML,
reply_markup=Timeline.get_button(post),
)
else:
await bot.send_media_group(
reply.cid,
Timeline.get_media_group(text, post),
reply_to_message_id=reply.mid,
)
@staticmethod
def code(text: str) -> str:
return f"<code>{text}</code>"
@staticmethod
def get_author_text(author: HumanAuthor) -> str:
text = "<b>Bsky User Info</b>\n\n"
text += f"Name: {Timeline.code(author.display_name)}\n"
text += f"Username: {author.format_handle}\n"
if author.description:
text += f"Bio: {Timeline.code(author.description)}\n"
text += f"Joined: {Timeline.code(author.time_str)}\n"
if author.posts_count:
text += f"📤 {author.posts_count} "
if author.followers_count:
text += f"粉丝 {author.followers_count} "
if author.follows_count:
text += f"关注 {author.follows_count}"
return text
@staticmethod
@flood_wait()
async def send_user(reply: "Reply", author: HumanAuthor):
text = Timeline.get_author_text(author)
if author.avatar_img:
return await bot.send_photo(
reply.cid,
author.avatar_img,
caption=text,
reply_to_message_id=reply.mid,
parse_mode=ParseMode.HTML,
reply_markup=Timeline.get_author_button(author),
)
else:
return await bot.send_message(
reply.cid,
text,
disable_web_page_preview=True,
reply_to_message_id=reply.mid,
parse_mode=ParseMode.HTML,
reply_markup=Timeline.get_author_button(author),
)
@staticmethod
async def fetch_post(handle: str, cid: str) -> Optional[HumanPost]:
try:
user = await Timeline.fetch_user(handle)
uri = f"at://{user.did}/app.bsky.feed.post/{cid}"
post = await bsky_client.client.get_post_thread(uri)
return HumanPost.parse_thread(post.thread)
except BadRequestError as e:
logs.error(f"bsky Error: {e}")
return None
@staticmethod
async def fetch_user(handle: str) -> Optional[HumanAuthor]:
try:
user = await bsky_client.client.get_profile(handle)
return HumanAuthor.parse_detail(user)
except BadRequestError as e:
logs.error(f"bsky Error: {e}")
return None

View File

@ -48,6 +48,9 @@ try:
except ValueError: except ValueError:
bili_auth_user: List[int] = [] bili_auth_user: List[int] = []
bili_auth_chat: List[int] = [] bili_auth_chat: List[int] = []
# bsky
bsky_username = config.get("bsky", "username", fallback="")
bsky_password = config.get("bsky", "password", fallback="")
try: try:
ipv6 = bool(strtobool(ipv6)) ipv6 = bool(strtobool(ipv6))
except ValueError: except ValueError:

View File

@ -3,12 +3,14 @@ import asyncio
from pyrogram import idle from pyrogram import idle
from init import logs, bot, sqlite from init import logs, bot, sqlite
from models.services.bsky import bsky_client
async def main(): async def main():
logs.info("连接服务器中。。。") logs.info("连接服务器中。。。")
await bot.start() await bot.start()
bot.loop.create_task(sqlite.create_db_and_tables()) bot.loop.create_task(sqlite.create_db_and_tables())
bot.loop.create_task(bsky_client.initialize())
logs.info(f"@{bot.me.username} 运行成功!") logs.info(f"@{bot.me.username} 运行成功!")
await idle() await idle()
await bot.stop() await bot.stop()

198
models/models/bsky.py Normal file
View File

@ -0,0 +1,198 @@
from typing import TYPE_CHECKING, Optional, Union
from datetime import datetime
import pytz
from pydantic import BaseModel
from atproto_client.models.app.bsky.embed.images import View as BskyViewImage
from atproto_client.models.app.bsky.embed.video import View as BskyViewVideo
from atproto_client.models.app.bsky.embed.external import View as BskyViewExternal
from atproto_client.models.app.bsky.embed.record import (
View as BskyViewRecord,
ViewRecord as BskyViewRecordRecord,
)
if TYPE_CHECKING:
from atproto_client.models.app.bsky.feed.defs import (
FeedViewPost,
PostView,
ThreadViewPost,
)
from atproto_client.models.app.bsky.actor.defs import (
ProfileViewBasic,
ProfileViewDetailed,
)
TZ = pytz.timezone("Asia/Shanghai")
class HumanAuthor(BaseModel):
display_name: str
handle: str
did: str
avatar_img: str
created_at: datetime
description: Optional[str] = None
followers_count: Optional[int] = None
follows_count: Optional[int] = None
posts_count: Optional[int] = None
@property
def url(self) -> str:
return f"https://bsky.app/profile/{self.handle}"
@property
def format(self) -> str:
return f'<a href="{self.url}">{self.display_name}</a>'
@property
def format_handle(self) -> str:
return f'<a href="{self.url}">@{self.handle}</a>'
@property
def time_str(self) -> str:
# utc+8
return self.created_at.astimezone(TZ).strftime("%Y-%m-%d %H:%M:%S")
@staticmethod
def parse(author: "ProfileViewBasic") -> "HumanAuthor":
return HumanAuthor(
display_name=author.display_name,
handle=author.handle,
did=author.did,
avatar_img=author.avatar,
created_at=author.created_at,
)
@staticmethod
def parse_detail(author: "ProfileViewDetailed") -> "HumanAuthor":
return HumanAuthor(
display_name=author.display_name,
handle=author.handle,
did=author.did,
avatar_img=author.avatar,
created_at=author.created_at,
description=author.description,
followers_count=author.followers_count,
follows_count=author.follows_count,
posts_count=author.posts_count,
)
class HumanPost(BaseModel, frozen=False):
cid: str
content: str
images: Optional[list[str]] = None
gif: Optional[str] = None
video: Optional[str] = None
external: Optional[str] = None
created_at: datetime
like_count: int
quote_count: int
reply_count: int
repost_count: int
uri: str
author: HumanAuthor
is_quote: bool = False
is_reply: bool = False
is_repost: bool = False
parent_post: "HumanPost" = None
@property
def url(self) -> str:
return self.author.url + "/post/" + self.uri.split("/")[-1]
@property
def time_str(self) -> str:
# utc+8
return self.created_at.astimezone(TZ).strftime("%Y-%m-%d %H:%M:%S")
@staticmethod
def parse_view(post: Union["PostView", "BskyViewRecordRecord"]) -> "HumanPost":
record = post.value if isinstance(post, BskyViewRecordRecord) else post.record
embed = (
(post.embeds[0] if post.embeds else None)
if isinstance(post, BskyViewRecordRecord)
else post.embed
)
content = record.text
created_at = record.created_at
# images
images = []
if isinstance(embed, BskyViewImage):
for image in embed.images:
images.append(image.fullsize)
# video
video = None
if isinstance(embed, BskyViewVideo):
video = embed.playlist # m3u8
# gif
gif, extra = None, None
if isinstance(embed, BskyViewExternal):
uri = embed.external.uri
if ".gif" in uri:
gif = uri
else:
extra = uri
# author
author = HumanAuthor.parse(post.author)
return HumanPost(
cid=post.cid,
content=content,
images=images,
gif=gif,
video=video,
external=extra,
created_at=created_at,
like_count=post.like_count,
quote_count=post.quote_count,
reply_count=post.reply_count,
repost_count=post.repost_count,
uri=post.uri,
author=author,
)
@staticmethod
def parse(data: "FeedViewPost") -> "HumanPost":
base = HumanPost.parse_view(data.post)
is_quote, is_reply, is_repost = False, False, False
parent_post = None
if data.reply:
is_reply = True
parent_post = HumanPost.parse_view(data.reply.parent)
elif data.reason:
is_repost = True
elif data.post.embed and isinstance(data.post.embed, BskyViewRecord):
is_quote = True
if isinstance(data.post.embed.record, BskyViewRecordRecord):
parent_post = HumanPost.parse_view(data.post.embed.record)
base.is_quote = is_quote
base.is_reply = is_reply
base.is_repost = is_repost
base.parent_post = parent_post
return base
@staticmethod
def parse_thread(data: "ThreadViewPost") -> "HumanPost":
base = HumanPost.parse_view(data.post)
is_quote, is_reply, is_repost = False, False, False
parent_post = None
if data.parent:
is_reply = True
parent_post = HumanPost.parse_view(data.parent.post)
elif data.post.embed and isinstance(data.post.embed, BskyViewRecord):
is_quote = True
if isinstance(data.post.embed.record, BskyViewRecordRecord):
parent_post = HumanPost.parse_view(data.post.embed.record)
base.is_quote = is_quote
base.is_reply = is_reply
base.is_repost = is_repost
base.parent_post = parent_post
return base

60
models/services/bsky.py Normal file
View File

@ -0,0 +1,60 @@
from pathlib import Path
from atproto import AsyncClient
from atproto.exceptions import BadRequestError
from typing import Optional
from atproto_client import Session, SessionEvent
from defs.glover import bsky_username, bsky_password
from init import logs
DATA_PATH = Path("data")
class SessionReuse:
def __init__(self):
self.session_file = DATA_PATH / "session.txt"
def get_session(self) -> Optional[str]:
try:
with open(self.session_file, encoding="UTF-8") as f:
return f.read()
except FileNotFoundError:
return None
def save_session(self, session_str) -> None:
with open(self.session_file, "w", encoding="UTF-8") as f:
f.write(session_str)
async def on_session_change(self, event: SessionEvent, session: Session) -> None:
if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
self.save_session(session.export())
class BskyClient:
def __init__(self):
self.client = AsyncClient()
self.session = SessionReuse()
self.client.on_session_change(self.session.on_session_change)
async def initialize(self):
session = self.session.get_session()
if session:
try:
await self.client.login(session_string=session)
logs.info(
"[bsky] Login with session success, me: %s", self.client.me.handle
)
return
except BadRequestError:
pass
await self.client.login(bsky_username, bsky_password)
logs.info(
"[bsky] Login with username and password success, me: %s",
self.client.me.handle,
)
bsky_client = BskyClient()

71
modules/bsky_api.py Normal file
View File

@ -0,0 +1,71 @@
import contextlib
from typing import Optional
from urllib.parse import urlparse
from pydantic import BaseModel
from pyrogram import Client, filters, ContinuePropagation
from pyrogram.enums import MessageEntityType, ChatType
from pyrogram.types import Message
from defs.bsky import Timeline
from init import bot
class Reply(BaseModel):
cid: int
mid: Optional[int] = None
async def process_url(url: str, reply: Reply):
url = urlparse(url)
if url.hostname and url.hostname in ["bsky.app"]:
if url.path.find("profile") < 0:
return
author_handle = str(url.path[url.path.find("profile") + 8 :].split("/")[0])
if url.path.find("post") >= 0:
status_id = str(url.path[url.path.find("post") + 5 :].split("/")[0]).split(
"?"
)[0]
try:
post = await Timeline.fetch_post(author_handle, status_id)
await Timeline.send_to_user(reply, post)
except Exception as e:
print(e)
elif url.path == "/":
return
else:
# 解析用户
try:
user = await Timeline.fetch_user(author_handle)
await Timeline.send_user(reply, user)
except Exception as e:
print(e)
@bot.on_message(filters.incoming & filters.text & filters.regex(r"bsky.app/"))
async def bsky_share(_: Client, message: Message):
if not message.text:
return
if (
message.sender_chat
and message.forward_from_chat
and message.sender_chat.id == message.forward_from_chat.id
):
# 过滤绑定频道的转发
return
mid = message.id
if message.text.startswith("del") and message.chat.type == ChatType.CHANNEL:
with contextlib.suppress(Exception):
await message.delete()
mid = None
reply = Reply(cid=message.chat.id, mid=mid)
for num in range(len(message.entities)):
entity = message.entities[num]
if entity.type == MessageEntityType.URL:
url = message.text[entity.offset : entity.offset + entity.length]
elif entity.type == MessageEntityType.TEXT_LINK:
url = entity.url
else:
continue
await process_url(url, reply)
raise ContinuePropagation

View File

@ -19,3 +19,4 @@ aiosqlite
aiofiles aiofiles
pydantic pydantic
lottie lottie
atproto