from typing import Optional, List from pyrogram.enums import ParseMode from pyrogram.types import ( InlineKeyboardMarkup, InlineKeyboardButton, InputMediaPhoto, InputMediaVideo, InputMediaAnimation, ) from init import logs from models.apis.twitter.client import twitter_client, TwitterError from models.apis.twitter.model import User, Tweet, MediaItem def twitter_link(tweet: Tweet): origin = tweet.retweet_or_quoted button = [ [ InlineKeyboardButton( text="Source", url=tweet.url, ), InlineKeyboardButton(text="Author", url=tweet.user.url), ] ] if origin: button[0].insert(1, InlineKeyboardButton(text="RSource", url=origin.url)) return InlineKeyboardMarkup(button) def twitter_user_link(user: User): return InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=user.url)]]) def twitter_medias(tweet: Tweet): tweet_media_lists = [] if tweet.extended_entities: tweet_media_lists.extend(tweet.extended_entities.media) if tweet.retweet and tweet.retweet.extended_entities: tweet_media_lists.extend(tweet.retweet.extended_entities.media) if tweet.quoted and tweet.quoted.extended_entities: tweet_media_lists.extend(tweet.quoted.extended_entities.media) return tweet_media_lists def twitter_media(tweet_media_lists: List[MediaItem], text: str): media_lists = [] for idx, media in enumerate(tweet_media_lists): if len(media_lists) >= 10: break if media.type == "photo": media_lists.append( InputMediaPhoto( media.media_url, caption=text if idx == 0 else None, parse_mode=ParseMode.HTML, ) ) elif media.type == "gif": media_lists.append( InputMediaAnimation( media.media_url, caption=text if idx == 0 else None, parse_mode=ParseMode.HTML, ) ) else: media_lists.append( InputMediaVideo( media.media_url, caption=text if idx == 0 else None, parse_mode=ParseMode.HTML, ) ) return media_lists def get_twitter_user(user: User): user_name = user.name user_username = user.screen_name verified = "๐Ÿ’Ž" if user.verified else "" protected = "๐Ÿ”’" if user.protected else "" text = ( f"Twitter User Info\n\n" f"Name: {verified}{protected}{user_name}\n" f'Username: @{user_username}\n' f"Bio: {user.description}\n" f"Joined: {user.created.strftime('%Y-%m-%d %H:%M:%S')}\n" f"๐Ÿ“ค {user.statuses_count} โค๏ธ{user.favourites_count} " f"็ฒ‰ไธ {user.followers_count} ๅ…ณๆณจ {user.friends_count}" ) return text def get_twitter_status(tweet: Tweet): text = tweet.full_text or "ๆš‚ ๆ—  ๅ†… ๅฎน" text = f"{text}" final_text = "Twitter Status Info\n\n" f"{text}\n\n" if tweet.retweet_or_quoted: roq = tweet.retweet_or_quoted final_text += ( f'RT: {roq.full_text or "ๆš‚ ๆ—  ๅ†… ๅฎน"}\n\n' f'{roq.user.one_line} ๅ‘่กจไบŽ {roq.created.strftime("%Y-%m-%d %H:%M:%S")}' f"\n๐Ÿ‘ {roq.favorite_count} ๐Ÿ” {roq.retweet_count}\n" f'{tweet.user.one_line} ่ฝฌไบŽ {tweet.created.strftime("%Y-%m-%d %H:%M:%S")}\n' f"๐Ÿ‘ {tweet.favorite_count} ๐Ÿ” {tweet.retweet_count}" ) else: final_text += ( f'{tweet.user.one_line} ๅ‘่กจไบŽ {tweet.created.strftime("%Y-%m-%d %H:%M:%S")}' f"\n๐Ÿ‘ {tweet.favorite_count} ๐Ÿ” {tweet.retweet_count}" ) return final_text async def fetch_tweet(tweet_id: int) -> Optional[Tweet]: try: tweet = await twitter_client.tweet_detail(tweet_id) for t in tweet: if t.id_str == str(tweet_id): return t return tweet[0] except TwitterError as e: logs.error(f"Twitter Error: {e}") return None async def fetch_user(username: str) -> Optional[User]: try: user = await twitter_client.user_by_screen_name(username) except TwitterError as e: logs.error(f"Twitter Error: {e}") return None return user