import contextlib from typing import Optional from urllib.parse import urlparse from pydantic import BaseModel from pyrogram import Client, filters, ContinuePropagation from pyrogram.enums import MessageEntityType from pyrogram.types import Message from defs.fix_twitter_api import ( fetch_tweet, get_twitter_status, twitter_link, twitter_media, fetch_user, get_twitter_user, twitter_user_link, twitter_medias, ) from init import bot from models.apis.fxtwitter.model import FixTweetMedia class Reply(BaseModel): cid: int mid: Optional[int] = None async def send_single_tweet(reply: Reply, media: FixTweetMedia, text: str, button): if media.type == "photo": await bot.send_photo( reply.cid, media.url, caption=text, reply_markup=button, reply_to_message_id=reply.mid, ) elif media.type == "video": await bot.send_video( reply.cid, media.url, caption=text, reply_markup=button, reply_to_message_id=reply.mid, ) elif media.type == "gif": await bot.send_animation( reply.cid, media.url, quote=True, caption=text, reply_markup=button, reply_to_message_id=reply.mid, ) else: await bot.reply_document( reply.cid, media.url, quote=True, caption=text, reply_markup=button, reply_to_message_id=reply.mid, ) async def process_status(reply: Reply, status: str): try: status = int(status) except ValueError: return tweet = await fetch_tweet(status) if not tweet: return text = get_twitter_status(tweet) button = twitter_link(tweet) medias = twitter_medias(tweet) if len(medias) == 1: media = medias[0] await send_single_tweet(reply, media, text, button) return media_lists = twitter_media(medias, text) if media_lists: await bot.send_media_group( reply.cid, media_lists, reply_to_message_id=reply.mid, ) else: await bot.send_message( reply.cid, text, reply_markup=button, reply_to_message_id=reply.mid, ) async def process_user(reply: Reply, username: str): user = await fetch_user(username) if not user: return text = get_twitter_user(user) button = twitter_user_link(user) await bot.send_photo( reply.cid, user.icon, caption=text, reply_markup=button, reply_to_message_id=reply.mid, ) async def process_url(url: str, reply: Reply): url = urlparse(url) if url.hostname and url.hostname in [ "twitter.com", "vxtwitter.com", "fxtwitter.com", "x.com", ]: if url.path.find("status") >= 0: status_id = str( url.path[url.path.find("status") + 7 :].split("/")[0] ).split("?")[0] try: await process_status(reply, status_id) except Exception as e: print(e) elif url.path == "/": return else: # 解析用户 uid = url.path.replace("/", "") try: await process_user(reply, uid) except Exception as e: print(e) @bot.on_message(filters.incoming & filters.text & filters.regex(r"twitter.com/")) async def twitter_share(_: Client, message: Message): if not message.text: return if ( message.sender_chat and message.forward_from_chat and message.sender_chat.id == message.forward_from_chat.id ): # 过滤绑定频道的转发 return mid = message.id if message.text.startswith("del"): with contextlib.suppress(Exception): await message.delete() mid = None reply = Reply(cid=message.chat.id, mid=mid) for num in range(len(message.entities)): entity = message.entities[num] if entity.type == MessageEntityType.URL: url = message.text[entity.offset : entity.offset + entity.length] elif entity.type == MessageEntityType.TEXT_LINK: url = entity.url else: continue await process_url(url, reply) raise ContinuePropagation