diff --git a/defs/twitter_api.py b/defs/twitter_api.py index 5447984..7b9fc9d 100644 --- a/defs/twitter_api.py +++ b/defs/twitter_api.py @@ -117,11 +117,7 @@ def get_twitter_status(tweet: Tweet): async def fetch_tweet(tweet_id: int) -> Optional[Tweet]: try: - tweet = await twitter_client.tweet_detail(tweet_id) - for t in tweet: - if t.id_str == str(tweet_id): - return t - return tweet[0] + return await twitter_client.tweet_detail(tweet_id) except TwitterError as e: logs.error(f"Twitter Error: {e}") return None diff --git a/models/apis/twitter/client.py b/models/apis/twitter/client.py index 5afd434..21695f1 100644 --- a/models/apis/twitter/client.py +++ b/models/apis/twitter/client.py @@ -1,7 +1,7 @@ import base64 import json import random -from typing import Dict, Any, List +from typing import Dict, Any from httpx import AsyncClient, Cookies @@ -149,8 +149,7 @@ class TwitterClient: self, endpoint: str, variables: Dict[str, Any], - path: List[str], - ): + ) -> Tweet: _variables = self._variables.copy() _variables.update(variables) data = await self.func( @@ -160,89 +159,32 @@ class TwitterClient: "features": json.dumps(self._features["tweets"]), }, ) - if not path: - instructions = data.json()["data"]["user"]["result"]["timeline"][ - "timeline" - ]["instructions"] - else: - instructions = data.json()["data"] - for key in path: - _instructions = instructions.get(key) - if _instructions is not None: - instructions = _instructions - break - instructions = instructions["instructions"] - for i in instructions: - if i["type"] == "TimelineAddEntries": - return i["entries"] - - @staticmethod - def gather_legacy_from_data(entries: List[Dict], filters: str = "tweet-"): - tweets: List[Tweet] = [] - filter_entries = [] - for entry in entries: - entry_id: str = entry.get("entryId") - if entry_id: - if filters == "none": - if entry_id.startswith("tweet-"): - filter_entries.append(entry) - elif entry_id.startswith( - "homeConversation-" - ) or entry_id.startswith("conversationthread-"): - filter_entries.extend(entry["content"]["items"]) - else: - if entry_id.startswith(filters): - filter_entries.append(entry) - for entry in filter_entries: - retweet = None - quoted = None - content = entry.get("content") or entry.get("item") - tweet = ( - content.get("itemContent", {}) - .get("tweet_results", {}) - .get("result", {}) - ) - if tweet and tweet.get("tweet"): - tweet = tweet["tweet"] - if tweet: - retweet = tweet.get("retweeted_status_result", {}).get("result") - quoted = tweet.get("quoted_status_result", {}).get("result") - if (not tweet) or (not tweet.get("legacy")): - continue - if retweet and not retweet.get("legacy"): - retweet = None - if quoted and not quoted.get("legacy"): - quoted = None - tweet = Tweet( - **tweet["legacy"], - **{"user": tweet["core"]["user_results"]["result"]["legacy"]}, - ) - if retweet: - tweet.retweet = Tweet( - **retweet["legacy"], - **{"user": retweet["core"]["user_results"]["result"]["legacy"]}, + result = data.json()["data"]["tweetResult"]["result"] + type_name = result["__typename"] + if type_name != "Tweet": + raise TwitterError(400, result.get("reason")) + tweet_legacy = result["legacy"] + user = User(**result["core"]["user_results"]["result"]["legacy"]) + tweet = Tweet(**tweet_legacy, **{"user": user}) + if tweet_legacy.get("quoted_status_id_str"): + try: + tweet.quoted = await self.tweet_detail( + int(tweet_legacy["quoted_status_id_str"]) ) - if quoted: - tweet.quoted = Tweet( - **quoted["legacy"], - **{"user": quoted["core"]["user_results"]["result"]["legacy"]}, - ) - tweets.append(tweet) - return tweets + except TwitterError: + pass + return tweet - async def tweet_detail(self, tid: int): - data = await self.pagination_tweets( - "/graphql/3XDB26fBve-MmjHaWTUZxA/TweetDetail", + async def tweet_detail(self, tid: int) -> Tweet: + return await self.pagination_tweets( + "/graphql/0hWvDhmW8YQ-S_ib3azIrw/TweetResultByRestId", variables={ - "focalTweetId": tid, - "with_rux_injections": False, + "tweetId": tid, "withCommunity": True, - "withQuickPromoteEligibilityTweetFields": True, - "withBirdwatchNotes": False, + "includePromotedContent": False, + "withVoice": False, }, - path=["threaded_conversation_with_injections"], ) - return self.gather_legacy_from_data(data, filters="none") async def user_by_screen_name(self, username: str): _variables = {"screen_name": username, "withHighlightedLabel": True} diff --git a/models/apis/twitter/model.py b/models/apis/twitter/model.py index 3cdd6ab..fb61f55 100644 --- a/models/apis/twitter/model.py +++ b/models/apis/twitter/model.py @@ -59,7 +59,7 @@ class User(BaseModel): """ 正在关注 """ location: str name: str - profile_banner_url: Optional[str] + profile_banner_url: Optional[str] = None profile_image_url_https: str screen_name: str verified: bool = False @@ -95,7 +95,7 @@ class Tweet(BaseModel): """ 书签次数 """ created_at: str conversation_id_str: str - extended_entities: Optional[ExtendedEntities] + extended_entities: Optional[ExtendedEntities] = None favorite_count: int """ 喜欢次数 """ full_text: str diff --git a/modules/twitter_api.py b/modules/twitter_api.py index 5460cfa..07054b9 100644 --- a/modules/twitter_api.py +++ b/modules/twitter_api.py @@ -14,6 +14,7 @@ from defs.twitter_api import ( twitter_user_link, twitter_medias, ) +from init import bot from models.apis.twitter.model import MediaItem @@ -84,7 +85,7 @@ async def process_user(message: Message, username: str): ) -# @Client.on_message(filters.incoming & filters.text & filters.regex(r"twitter.com/")) +@bot.on_message(filters.incoming & filters.text & filters.regex(r"twitter.com/")) async def twitter_share(_: Client, message: Message): if not message.text: return