iShotaBot/models/apis/twitter/client.py
2023-07-16 17:49:21 +08:00

260 lines
9.9 KiB
Python

import base64
import json
import random
from typing import Dict, Any, List
from httpx import AsyncClient, Cookies
from .model import Tweet, User
class TwitterError(Exception):
def __init__(self, status_code: int, message: str):
self.status_code = status_code
self.message = message
class TwitterClient:
headers = {
"authorization": "",
"x-guest-token": "",
"x-twitter-auth-type": "",
"x-twitter-client-language": "en",
"x-twitter-active-user": "yes",
"x-csrf-token": "",
"Referer": "https://twitter.com/",
}
tokens = ["CjulERsDeqhhjSme66ECg:IQWdVyqFxghAtURHGeGiWAsmCAGmdW3WmbEx6Hck"]
_variables = {
"count": 20,
"includePromotedContent": False,
"withSuperFollowsUserFields": True,
"withBirdwatchPivots": False,
"withDownvotePerspective": False,
"withReactionsMetadata": False,
"withReactionsPerspective": False,
"withSuperFollowsTweetFields": True,
"withClientEventToken": False,
"withBirdwatchNotes": False,
"withVoice": True,
"withV2Timeline": False,
"__fs_interactive_text": False,
"__fs_dont_mention_me_view_api_enabled": False,
}
_features = {
"tweets": {
"rweb_lists_timeline_redesign_enabled": True,
"responsive_web_graphql_exclude_directive_enabled": True,
"verified_phone_label_enabled": False,
"creator_subscriptions_tweet_preview_api_enabled": True,
"responsive_web_graphql_timeline_navigation_enabled": True,
"responsive_web_graphql_skip_user_profile_image_extensions_enabled": False,
"tweetypie_unmention_optimization_enabled": True,
"responsive_web_edit_tweet_api_enabled": True,
"graphql_is_translatable_rweb_tweet_is_translatable_enabled": True,
"view_counts_everywhere_api_enabled": True,
"longform_notetweets_consumption_enabled": True,
"responsive_web_twitter_article_tweet_consumption_enabled": False,
"tweet_awards_web_tipping_enabled": False,
"freedom_of_speech_not_reach_fetch_enabled": True,
"standardized_nudges_misinfo": True,
"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": True,
"longform_notetweets_rich_text_read_enabled": True,
"longform_notetweets_inline_media_enabled": True,
"responsive_web_media_download_video_enabled": False,
"responsive_web_enhance_cards_enabled": False,
},
"user_by_screen_name": {
"hidden_profile_likes_enabled": False,
"responsive_web_graphql_exclude_directive_enabled": True,
"verified_phone_label_enabled": False,
"subscriptions_verification_info_verified_since_enabled": True,
"highlights_tweets_tab_ui_enabled": True,
"creator_subscriptions_tweet_preview_api_enabled": True,
"responsive_web_graphql_skip_user_profile_image_extensions_enabled": False,
"responsive_web_graphql_timeline_navigation_enabled": True,
},
}
def __init__(self):
self.cookie = Cookies()
self.client = AsyncClient(cookies=self.cookie, timeout=60.0)
self.inited = False
async def request(
self, url: str, method: str = "GET", headers: Dict[str, str] = None, **kwargs
):
headers = headers or self.headers
headers = {key: value for key, value in headers.items() if value is not None}
return await self.client.request(method, url, headers=headers, **kwargs)
async def reset_session(self):
self.headers[
"authorization"
] = f"Basic {base64.b64encode(self.tokens[0].encode()).decode()}"
response = await self.request(
"https://api.twitter.com/oauth2/token",
method="POST",
params={"grant_type": "client_credentials"},
)
access_token = response.json()["access_token"]
self.headers["authorization"] = f"Bearer {access_token}"
# 生成csrf - token
# 32个随机十六进制字符
csrf_token = "".join([random.choice("0123456789abcdef") for _ in range(32)])
self.cookie.set("ct0", csrf_token, domain=".twitter.com")
self.headers["x-csrf-token"] = csrf_token
self.headers["x-guest-token"] = ""
# 发起初始化请求
response = await self.request(
"https://api.twitter.com/1.1/guest/activate.json",
method="POST",
)
# 获取guest - token
guest_token = response.json()["guest_token"]
self.headers["x-guest-token"] = guest_token
self.cookie.set("gt", guest_token, domain=".twitter.com")
# 发起第二个初始化请求, 获取_twitter_sess
await self.request(
"https://twitter.com/i/js_inst",
method="GET",
params={"c_name": "ui_metrics"},
)
async def func(self, url: str, method: str = "GET", **kwargs):
if not self.inited:
await self.reset_session()
self.inited = True
response = await self.request(
url,
method=method,
**kwargs,
)
if response.status_code == 403:
await self.reset_session()
response = await self.request(
url,
method=method,
**kwargs,
)
if response.status_code != 200:
raise TwitterError(response.status_code, response.text)
csrf_token = response.cookies.get("ct0")
if csrf_token:
self.headers["x-csrf-token"] = csrf_token
return response
async def pagination_tweets(
self,
endpoint: str,
variables: Dict[str, Any],
path: List[str],
):
_variables = self._variables.copy()
_variables.update(variables)
data = await self.func(
f"https://twitter.com/i/api{endpoint}",
params={
"variables": json.dumps(_variables),
"features": json.dumps(self._features["tweets"]),
},
)
if not path:
instructions = data.json()["data"]["user"]["result"]["timeline"][
"timeline"
]["instructions"]
else:
instructions = data.json()["data"]
for key in path:
_instructions = instructions.get(key)
if _instructions is not None:
instructions = _instructions
break
instructions = instructions["instructions"]
for i in instructions:
if i["type"] == "TimelineAddEntries":
return i["entries"]
@staticmethod
def gather_legacy_from_data(entries: List[Dict], filters: str = "tweet-"):
tweets: List[Tweet] = []
filter_entries = []
for entry in entries:
entry_id: str = entry.get("entryId")
if entry_id:
if filters == "none":
if entry_id.startswith("tweet-"):
filter_entries.append(entry)
elif entry_id.startswith(
"homeConversation-"
) or entry_id.startswith("conversationthread-"):
filter_entries.extend(entry["content"]["items"])
else:
if entry_id.startswith(filters):
filter_entries.append(entry)
for entry in filter_entries:
retweet = None
quoted = None
content = entry.get("content") or entry.get("item")
tweet = (
content.get("itemContent", {})
.get("tweet_results", {})
.get("result", {})
)
if tweet and tweet.get("tweet"):
tweet = tweet["tweet"]
if tweet:
retweet = tweet.get("retweeted_status_result", {}).get("result")
quoted = tweet.get("quoted_status_result", {}).get("result")
if (not tweet) or (not tweet.get("legacy")):
continue
if retweet and not retweet.get("legacy"):
retweet = None
if quoted and not quoted.get("legacy"):
quoted = None
tweet = Tweet(
**tweet["legacy"],
**{"user": tweet["core"]["user_results"]["result"]["legacy"]},
)
if retweet:
tweet.retweet = Tweet(
**retweet["legacy"],
**{"user": retweet["core"]["user_results"]["result"]["legacy"]},
)
if quoted:
tweet.quoted = Tweet(
**quoted["legacy"],
**{"user": quoted["core"]["user_results"]["result"]["legacy"]},
)
tweets.append(tweet)
return tweets
async def tweet_detail(self, tid: int):
data = await self.pagination_tweets(
"/graphql/3XDB26fBve-MmjHaWTUZxA/TweetDetail",
variables={
"focalTweetId": tid,
"with_rux_injections": False,
"withCommunity": True,
"withQuickPromoteEligibilityTweetFields": True,
"withBirdwatchNotes": False,
},
path=["threaded_conversation_with_injections"],
)
return self.gather_legacy_from_data(data, filters="none")
async def user_by_screen_name(self, username: str):
_variables = {"screen_name": username, "withHighlightedLabel": True}
data = await self.func(
"https://twitter.com/i/api/graphql/oUZZZ8Oddwxs8Cd3iW3UEA/UserByScreenName",
params={
"variables": json.dumps(_variables),
"features": json.dumps(self._features["user_by_screen_name"]),
},
)
return User(**data.json()["data"]["user"]["result"]["legacy"])
twitter_client = TwitterClient()