diff --git a/defs/fix_twitter_api.py b/defs/fix_twitter_api.py
new file mode 100644
index 0000000..7322f72
--- /dev/null
+++ b/defs/fix_twitter_api.py
@@ -0,0 +1,128 @@
+from typing import Optional, List
+
+from pyrogram.enums import ParseMode
+from pyrogram.types import (
+ InlineKeyboardMarkup,
+ InlineKeyboardButton,
+ InputMediaPhoto,
+ InputMediaVideo,
+ InputMediaAnimation,
+)
+
+from init import logs
+from models.apis.fxtwitter.client import fix_twitter_client, FixTwitterError
+from models.apis.fxtwitter.model import User, FixTweet, FixTweetMedia
+
+
+def twitter_link(tweet: FixTweet):
+ origin = tweet.retweet_or_quoted
+ button = [
+ [
+ InlineKeyboardButton(
+ text="Source",
+ url=tweet.url,
+ ),
+ InlineKeyboardButton(text="Author", url=tweet.author.url),
+ ]
+ ]
+ if origin:
+ button[0].insert(1, InlineKeyboardButton(text="RSource", url=origin.url))
+ return InlineKeyboardMarkup(button)
+
+
+def twitter_user_link(user: User):
+ return InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=user.url)]])
+
+
+def twitter_medias(tweet: FixTweet):
+ tweet_media_lists = []
+ if tweet.medias:
+ tweet_media_lists.extend(tweet.medias)
+ if tweet.retweet_or_quoted:
+ tweet_media_lists.extend(tweet.retweet_or_quoted.medias)
+ return tweet_media_lists
+
+
+def twitter_media(tweet_media_lists: List[FixTweetMedia], text: str):
+ media_lists = []
+ for idx, media in enumerate(tweet_media_lists):
+ if len(media_lists) > 10:
+ break
+ if media.type == "photo":
+ media_lists.append(
+ InputMediaPhoto(
+ media.media_url,
+ caption=text if idx == 0 else None,
+ parse_mode=ParseMode.HTML,
+ )
+ )
+ elif media.type == "gif":
+ media_lists.append(
+ InputMediaAnimation(
+ media.media_url,
+ caption=text if idx == 0 else None,
+ parse_mode=ParseMode.HTML,
+ )
+ )
+ else:
+ media_lists.append(
+ InputMediaVideo(
+ media.media_url,
+ caption=text if idx == 0 else None,
+ parse_mode=ParseMode.HTML,
+ )
+ )
+ return media_lists
+
+
+def get_twitter_user(user: User):
+ user_name = user.name
+ user_username = user.screen_name
+ text = (
+ f"Twitter User Info\n\n"
+ f"Name: {user_name}
\n"
+ f'Username: @{user_username}\n'
+ f"Bio: {user.description}
\n"
+ f"Joined: {user.created.strftime('%Y-%m-%d %H:%M:%S')}
\n"
+ f"š¤ {user.tweets} ā¤ļø{user.likes} "
+ f"ē²äø {user.followers} å
³ę³Ø {user.following}"
+ )
+ return text
+
+
+def get_twitter_status(tweet: FixTweet):
+ text = tweet.text or "ę ę å
容"
+ text = f"{text}
"
+ final_text = "Twitter Status Info\n\n" f"{text}\n\n"
+ if tweet.retweet_or_quoted:
+ roq = tweet.retweet_or_quoted
+ final_text += (
+ f'RT: {roq.text or "ę ę å
容"}
\n\n'
+ f'{roq.author.one_line} åč”Øäŗ {roq.created.strftime("%Y-%m-%d %H:%M:%S")}'
+ f"\nš {roq.views} š {roq.likes} š {roq.retweets}\n"
+ f'{tweet.author.one_line} č½¬äŗ {tweet.created.strftime("%Y-%m-%d %H:%M:%S")}\n'
+ f"š {tweet.views} š {tweet.likes} š {tweet.retweets}"
+ )
+ else:
+ final_text += (
+ f'{tweet.author.one_line} åč”Øäŗ {tweet.created.strftime("%Y-%m-%d %H:%M:%S")}'
+ f"\nš {tweet.views} š {tweet.likes} š {tweet.retweets}"
+ )
+ return final_text
+
+
+async def fetch_tweet(tweet_id: int) -> Optional[FixTweet]:
+ try:
+ return await fix_twitter_client.tweet_detail(tweet_id)
+ except FixTwitterError as e:
+ logs.error(f"Twitter Error: {e}")
+ return None
+
+
+async def fetch_user(username: str) -> Optional[User]:
+ try:
+ user = await fix_twitter_client.user_by_screen_name(username)
+ except FixTwitterError as e:
+ logs.error(f"Twitter Error: {e}")
+ return None
+ return user
diff --git a/models/apis/fxtwitter/__init__.py b/models/apis/fxtwitter/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/models/apis/fxtwitter/client.py b/models/apis/fxtwitter/client.py
new file mode 100644
index 0000000..6df95f7
--- /dev/null
+++ b/models/apis/fxtwitter/client.py
@@ -0,0 +1,52 @@
+from typing import Dict
+
+from httpx import AsyncClient
+
+from .model import FixTweet, FixTweetMedia, User
+
+
+class FixTwitterError(Exception):
+ def __init__(self, status_code: int, message: str):
+ self.status_code = status_code
+ self.message = message
+
+
+class FixTwitterClient:
+ def __init__(self):
+ self.client = AsyncClient(timeout=60.0)
+ self.url = "https://api.fxtwitter.com/"
+
+ @staticmethod
+ def gen_tweet(data: Dict) -> FixTweet:
+ tweet = FixTweet(**data)
+ if medias := data.get("media"):
+ if all_media := medias.get("all"):
+ tweet.medias = [FixTweetMedia(**i) for i in all_media]
+ if quote := data.get("quote"):
+ tweet.quoted = FixTwitterClient.gen_tweet(quote)
+ if retweet := data.get("retweet"):
+ tweet.retweeted = FixTwitterClient.gen_tweet(retweet)
+ return tweet
+
+ async def tweet_detail(self, tid: int) -> FixTweet:
+ url = f"{self.url}X/status/{tid}"
+ response = await self.client.get(url)
+ if response.status_code != 200:
+ raise FixTwitterError(response.status_code, response.text)
+ data = response.json()
+ if data.get("code", 200) != 200:
+ raise FixTwitterError(data.get("code"), data.get("message"))
+ return self.gen_tweet(data["tweet"])
+
+ async def user_by_screen_name(self, username: str) -> User:
+ url = f"{self.url}{username}"
+ response = await self.client.get(url)
+ if response.status_code != 200:
+ raise FixTwitterError(response.status_code, response.text)
+ data = response.json()
+ if data.get("code", 200) != 200:
+ raise FixTwitterError(data.get("code"), data.get("message"))
+ return User(**data["user"])
+
+
+fix_twitter_client = FixTwitterClient()
diff --git a/models/apis/fxtwitter/model.py b/models/apis/fxtwitter/model.py
new file mode 100644
index 0000000..e23b026
--- /dev/null
+++ b/models/apis/fxtwitter/model.py
@@ -0,0 +1,76 @@
+from datetime import datetime, timedelta
+from typing import List, Optional
+
+from pydantic import BaseModel
+
+
+class User(BaseModel):
+ id: int
+ name: str
+ screen_name: str
+ avatar_url: str = ""
+ banner_url: str = ""
+ description: str = ""
+ location: str = ""
+ url: str
+ followers: int = 0
+ following: int = 0
+ joined: str
+ tweets: int = 0
+ likes: int = 0
+
+ @property
+ def created(self) -> datetime:
+ """å
„ęØę¶é“"""
+ # 'Fri Jun 30 12:08:56 +0000 2023'
+ return datetime.strptime(self.joined, "%a %b %d %H:%M:%S %z %Y") + timedelta(
+ hours=8
+ )
+
+ @property
+ def icon(self) -> str:
+ """夓å"""
+ return self.avatar_url.replace("_normal.jpg", ".jpg")
+
+ @property
+ def one_line(self) -> str:
+ return f'{self.screen_name}'
+
+
+class FixTweetMedia(BaseModel):
+ type: str
+ url: str
+ width: int = 0
+ height: int = 0
+ altText: str = ""
+
+
+class FixTweet(BaseModel):
+ url: str
+ id: int
+ text: str
+ replies: int
+ """ åå¤ """
+ retweets: int
+ """ č½¬ęØ """
+ likes: int
+ """ åę¬¢ """
+ created_at: str
+ views: int
+ """ é
čÆ»ę¬”ę° """
+ source: str
+ medias: Optional[List[FixTweetMedia]] = None
+ author: User
+ retweeted: "FixTweet" = None
+ quoted: "FixTweet" = None
+
+ @property
+ def created(self) -> datetime:
+ # 'Fri Jun 30 12:08:56 +0000 2023'
+ return datetime.strptime(
+ self.created_at, "%a %b %d %H:%M:%S %z %Y"
+ ) + timedelta(hours=8)
+
+ @property
+ def retweet_or_quoted(self) -> "FixTweet":
+ return self.retweeted or self.quoted
diff --git a/modules/twitter_api.py b/modules/twitter_api.py
index 07054b9..a79287c 100644
--- a/modules/twitter_api.py
+++ b/modules/twitter_api.py
@@ -4,7 +4,7 @@ from pyrogram import Client, filters, ContinuePropagation
from pyrogram.enums import MessageEntityType
from pyrogram.types import Message
-from defs.twitter_api import (
+from defs.fix_twitter_api import (
fetch_tweet,
get_twitter_status,
twitter_link,
@@ -85,6 +85,28 @@ async def process_user(message: Message, username: str):
)
+async def process_url(url: str, message: Message):
+ url = urlparse(url)
+ if url.hostname and url.hostname in ["twitter.com", "vxtwitter.com"]:
+ if url.path.find("status") >= 0:
+ status_id = str(
+ url.path[url.path.find("status") + 7 :].split("/")[0]
+ ).split("?")[0]
+ try:
+ await process_status(message, status_id)
+ except Exception as e:
+ print(e)
+ elif url.path == "/":
+ return
+ else:
+ # č§£ęēØę·
+ uid = url.path.replace("/", "")
+ try:
+ await process_user(message, uid)
+ except Exception as e:
+ print(e)
+
+
@bot.on_message(filters.incoming & filters.text & filters.regex(r"twitter.com/"))
async def twitter_share(_: Client, message: Message):
if not message.text:
@@ -97,23 +119,5 @@ async def twitter_share(_: Client, message: Message):
url = entity.url
else:
continue
- url = urlparse(url)
- if url.hostname and url.hostname in ["twitter.com", "vxtwitter.com"]:
- if url.path.find("status") >= 0:
- status_id = str(
- url.path[url.path.find("status") + 7 :].split("/")[0]
- ).split("?")[0]
- try:
- await process_status(message, status_id)
- except Exception as e:
- print(e)
- elif url.path == "/":
- return
- else:
- # č§£ęēØę·
- uid = url.path.replace("/", "")
- try:
- await process_user(message, uid)
- except Exception as e:
- print(e)
+ await process_url(url, message)
raise ContinuePropagation