from typing import Optional, List from pyrogram.enums import ParseMode from pyrogram.types import ( InlineKeyboardMarkup, InlineKeyboardButton, InputMediaPhoto, InputMediaVideo, InputMediaAnimation, ) from init import logs from models.apis.fxtwitter.client import fix_twitter_client, FixTwitterError from models.apis.fxtwitter.model import User, FixTweet, FixTweetMedia def twitter_link(tweet: FixTweet): origin = tweet.retweet_or_quoted button = [ [ InlineKeyboardButton( text="Source", url=tweet.url, ), InlineKeyboardButton(text="Author", url=tweet.author.url), ] ] if origin: button[0].insert(1, InlineKeyboardButton(text="RSource", url=origin.url)) return InlineKeyboardMarkup(button) def twitter_user_link(user: User): return InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=user.url)]]) def twitter_medias(tweet: FixTweet): tweet_media_lists = [] if tweet.medias: tweet_media_lists.extend(tweet.medias) if tweet.retweet_or_quoted: tweet_media_lists.extend(tweet.retweet_or_quoted.medias) return tweet_media_lists def twitter_media(tweet_media_lists: List[FixTweetMedia], text: str): media_lists = [] for idx, media in enumerate(tweet_media_lists): if len(media_lists) > 10: break if media.type == "photo": media_lists.append( InputMediaPhoto( media.url, caption=text if idx == 0 else None, parse_mode=ParseMode.HTML, ) ) elif media.type == "gif": media_lists.append( InputMediaAnimation( media.url, caption=text if idx == 0 else None, parse_mode=ParseMode.HTML, ) ) elif media.type == "video": media_lists.append( InputMediaVideo( media.url, thumb=media.thumbnail_url, duration=media.duration, caption=text if idx == 0 else None, parse_mode=ParseMode.HTML, ) ) return media_lists def get_twitter_user(user: User): user_name = user.name user_username = user.screen_name text = ( f"Twitter User Info\n\n" f"Name: {user_name}\n" f'Username: @{user_username}\n' f"Bio: {user.description}\n" f"Joined: {user.created.strftime('%Y-%m-%d %H:%M:%S')}\n" f"๐Ÿ“ค {user.tweets} โค๏ธ{user.likes} " f"็ฒ‰ไธ {user.followers} ๅ…ณๆณจ {user.following}" ) return text def get_twitter_status(tweet: FixTweet): text = tweet.text or "ๆš‚ ๆ—  ๅ†… ๅฎน" text = f"{text}" final_text = "Twitter Status Info\n\n" f"{text}\n\n" if tweet.retweet_or_quoted: roq = tweet.retweet_or_quoted final_text += ( f'RT: {roq.text or "ๆš‚ ๆ—  ๅ†… ๅฎน"}\n\n' f'{roq.author.one_line} ๅ‘่กจไบŽ {roq.created.strftime("%Y-%m-%d %H:%M:%S")}' f"\n๐Ÿ‘ {roq.views} ๐Ÿ‘ {roq.likes} ๐Ÿ” {roq.retweets}\n" f'{tweet.author.one_line} ่ฝฌไบŽ {tweet.created.strftime("%Y-%m-%d %H:%M:%S")}\n' f"๐Ÿ‘ {tweet.views} ๐Ÿ‘ {tweet.likes} ๐Ÿ” {tweet.retweets}" ) else: final_text += ( f'{tweet.author.one_line} ๅ‘่กจไบŽ {tweet.created.strftime("%Y-%m-%d %H:%M:%S")}' f"\n๐Ÿ‘ {tweet.views} ๐Ÿ‘ {tweet.likes} ๐Ÿ” {tweet.retweets}" ) return final_text async def fetch_tweet(tweet_id: int) -> Optional[FixTweet]: try: return await fix_twitter_client.tweet_detail(tweet_id) except FixTwitterError as e: logs.error(f"Twitter Error: {e}") return None async def fetch_user(username: str) -> Optional[User]: try: user = await fix_twitter_client.user_by_screen_name(username) except FixTwitterError as e: logs.error(f"Twitter Error: {e}") return None return user