import asyncio import traceback from typing import Optional, TYPE_CHECKING from atproto_client.exceptions import BadRequestError from pyrogram.enums import ParseMode from pyrogram.errors import FloodWait from pyrogram.types import ( InlineKeyboardMarkup, InlineKeyboardButton, InputMediaPhoto, ) from init import bot, logs from models.models.bsky import HumanPost, HumanAuthor from models.services.bsky import bsky_client if TYPE_CHECKING: from modules.bsky_api import Reply def flood_wait(): def decorator(function): async def wrapper(*args, **kwargs): try: return await function(*args, **kwargs) except FloodWait as e: logs.warning(f"遇到 FloodWait,等待 {e.value} 秒后重试!") await asyncio.sleep(e.value + 1) return await wrapper(*args, **kwargs) except Exception as e: traceback.print_exc() raise e return wrapper return decorator class Timeline: @staticmethod def get_button(post: HumanPost) -> InlineKeyboardMarkup: buttons = [ InlineKeyboardButton("Source", url=post.url), InlineKeyboardButton("Author", url=post.author.url), ] if post.parent_post: buttons.insert(1, InlineKeyboardButton("RSource", url=post.parent_post.url)) return InlineKeyboardMarkup([buttons]) @staticmethod def get_author_button(author: HumanAuthor) -> InlineKeyboardMarkup: return InlineKeyboardMarkup( [ [ InlineKeyboardButton("Author", url=author.url), ] ] ) @staticmethod def get_media_group(text: str, post: HumanPost) -> list[InputMediaPhoto]: data = [] images = post.images for idx, image in enumerate(images): data.append( InputMediaPhoto( image, caption=text if idx == 0 else None, parse_mode=ParseMode.HTML, ) ) return data @staticmethod def get_post_text(post: HumanPost) -> str: text = "Bsky Post Info\n\n" if post.parent_post: text += f"> {post.parent_post.content}\n\n=====================\n\n" text += post.content text += "\n\n" if (post.is_reply or post.is_quote) and post.parent_post: text += f"{post.parent_post.author.format} {post.parent_post.status}于 {post.parent_post.time_str}\n" text += f"{post.author.format} {post.status}于 {post.time_str}\n" if post.is_repost and post.repost_info: text += f"{post.repost_info.by.format} 转发于 {post.repost_info.time_str}\n" text += f"点赞: {post.like_count} | 引用: {post.quote_count} | 回复: {post.reply_count} | 转发: {post.repost_count}" return text @staticmethod @flood_wait() async def send_to_user(reply: "Reply", post: HumanPost): text = Timeline.get_post_text(post) if post.gif: return await bot.send_animation( reply.cid, post.gif, caption=text, reply_to_message_id=reply.mid, parse_mode=ParseMode.HTML, reply_markup=Timeline.get_button(post), ) elif post.external: return await bot.send_document( reply.cid, post.external, caption=text, reply_to_message_id=reply.mid, parse_mode=ParseMode.HTML, reply_markup=Timeline.get_button(post), ) elif not post.images: return await bot.send_message( reply.cid, text, disable_web_page_preview=True, reply_to_message_id=reply.mid, parse_mode=ParseMode.HTML, reply_markup=Timeline.get_button(post), ) elif len(post.images) == 1: return await bot.send_photo( reply.cid, post.images[0], caption=text, reply_to_message_id=reply.mid, parse_mode=ParseMode.HTML, reply_markup=Timeline.get_button(post), ) else: await bot.send_media_group( reply.cid, Timeline.get_media_group(text, post), reply_to_message_id=reply.mid, ) @staticmethod def code(text: str) -> str: return f"{text}" @staticmethod def get_author_text(author: HumanAuthor) -> str: text = "Bsky User Info\n\n" text += f"Name: {Timeline.code(author.display_name)}\n" text += f"Username: {author.format_handle}\n" if author.description: text += f"Bio: {Timeline.code(author.description)}\n" text += f"Joined: {Timeline.code(author.time_str)}\n" if author.posts_count: text += f"📤 {author.posts_count} " if author.followers_count: text += f"粉丝 {author.followers_count} " if author.follows_count: text += f"关注 {author.follows_count}" return text @staticmethod @flood_wait() async def send_user(reply: "Reply", author: HumanAuthor): text = Timeline.get_author_text(author) if author.avatar_img: return await bot.send_photo( reply.cid, author.avatar_img, caption=text, reply_to_message_id=reply.mid, parse_mode=ParseMode.HTML, reply_markup=Timeline.get_author_button(author), ) else: return await bot.send_message( reply.cid, text, disable_web_page_preview=True, reply_to_message_id=reply.mid, parse_mode=ParseMode.HTML, reply_markup=Timeline.get_author_button(author), ) @staticmethod async def fetch_post(handle: str, cid: str) -> Optional[HumanPost]: try: user = await Timeline.fetch_user(handle) uri = f"at://{user.did}/app.bsky.feed.post/{cid}" post = await bsky_client.client.get_post_thread(uri) return HumanPost.parse_thread(post.thread) except BadRequestError as e: logs.error(f"bsky Error: {e}") return None @staticmethod async def fetch_user(handle: str) -> Optional[HumanAuthor]: try: user = await bsky_client.client.get_profile(handle) return HumanAuthor.parse_detail(user) except BadRequestError as e: logs.error(f"bsky Error: {e}") return None