mirror of
https://github.com/Xtao-Labs/iShotaBot.git
synced 2024-11-25 01:29:45 +00:00
260 lines
9.9 KiB
Python
260 lines
9.9 KiB
Python
import base64
|
|
import json
|
|
import random
|
|
from typing import Dict, Any, List
|
|
|
|
from httpx import AsyncClient, Cookies
|
|
|
|
from .model import Tweet, User
|
|
|
|
|
|
class TwitterError(Exception):
|
|
def __init__(self, status_code: int, message: str):
|
|
self.status_code = status_code
|
|
self.message = message
|
|
|
|
|
|
class TwitterClient:
|
|
headers = {
|
|
"authorization": "",
|
|
"x-guest-token": "",
|
|
"x-twitter-auth-type": "",
|
|
"x-twitter-client-language": "en",
|
|
"x-twitter-active-user": "yes",
|
|
"x-csrf-token": "",
|
|
"Referer": "https://twitter.com/",
|
|
}
|
|
tokens = ["CjulERsDeqhhjSme66ECg:IQWdVyqFxghAtURHGeGiWAsmCAGmdW3WmbEx6Hck"]
|
|
|
|
_variables = {
|
|
"count": 20,
|
|
"includePromotedContent": False,
|
|
"withSuperFollowsUserFields": True,
|
|
"withBirdwatchPivots": False,
|
|
"withDownvotePerspective": False,
|
|
"withReactionsMetadata": False,
|
|
"withReactionsPerspective": False,
|
|
"withSuperFollowsTweetFields": True,
|
|
"withClientEventToken": False,
|
|
"withBirdwatchNotes": False,
|
|
"withVoice": True,
|
|
"withV2Timeline": False,
|
|
"__fs_interactive_text": False,
|
|
"__fs_dont_mention_me_view_api_enabled": False,
|
|
}
|
|
_features = {
|
|
"tweets": {
|
|
"rweb_lists_timeline_redesign_enabled": True,
|
|
"responsive_web_graphql_exclude_directive_enabled": True,
|
|
"verified_phone_label_enabled": False,
|
|
"creator_subscriptions_tweet_preview_api_enabled": True,
|
|
"responsive_web_graphql_timeline_navigation_enabled": True,
|
|
"responsive_web_graphql_skip_user_profile_image_extensions_enabled": False,
|
|
"tweetypie_unmention_optimization_enabled": True,
|
|
"responsive_web_edit_tweet_api_enabled": True,
|
|
"graphql_is_translatable_rweb_tweet_is_translatable_enabled": True,
|
|
"view_counts_everywhere_api_enabled": True,
|
|
"longform_notetweets_consumption_enabled": True,
|
|
"responsive_web_twitter_article_tweet_consumption_enabled": False,
|
|
"tweet_awards_web_tipping_enabled": False,
|
|
"freedom_of_speech_not_reach_fetch_enabled": True,
|
|
"standardized_nudges_misinfo": True,
|
|
"tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": True,
|
|
"longform_notetweets_rich_text_read_enabled": True,
|
|
"longform_notetweets_inline_media_enabled": True,
|
|
"responsive_web_media_download_video_enabled": False,
|
|
"responsive_web_enhance_cards_enabled": False,
|
|
},
|
|
"user_by_screen_name": {
|
|
"hidden_profile_likes_enabled": False,
|
|
"responsive_web_graphql_exclude_directive_enabled": True,
|
|
"verified_phone_label_enabled": False,
|
|
"subscriptions_verification_info_verified_since_enabled": True,
|
|
"highlights_tweets_tab_ui_enabled": True,
|
|
"creator_subscriptions_tweet_preview_api_enabled": True,
|
|
"responsive_web_graphql_skip_user_profile_image_extensions_enabled": False,
|
|
"responsive_web_graphql_timeline_navigation_enabled": True,
|
|
},
|
|
}
|
|
|
|
def __init__(self):
|
|
self.cookie = Cookies()
|
|
self.client = AsyncClient(cookies=self.cookie, timeout=60.0)
|
|
self.inited = False
|
|
|
|
async def request(
|
|
self, url: str, method: str = "GET", headers: Dict[str, str] = None, **kwargs
|
|
):
|
|
headers = headers or self.headers
|
|
headers = {key: value for key, value in headers.items() if value is not None}
|
|
return await self.client.request(method, url, headers=headers, **kwargs)
|
|
|
|
async def reset_session(self):
|
|
self.headers[
|
|
"authorization"
|
|
] = f"Basic {base64.b64encode(self.tokens[0].encode()).decode()}"
|
|
response = await self.request(
|
|
"https://api.twitter.com/oauth2/token",
|
|
method="POST",
|
|
params={"grant_type": "client_credentials"},
|
|
)
|
|
access_token = response.json()["access_token"]
|
|
self.headers["authorization"] = f"Bearer {access_token}"
|
|
# 生成csrf - token
|
|
# 32个随机十六进制字符
|
|
csrf_token = "".join([random.choice("0123456789abcdef") for _ in range(32)])
|
|
self.cookie.set("ct0", csrf_token, domain=".twitter.com")
|
|
self.headers["x-csrf-token"] = csrf_token
|
|
self.headers["x-guest-token"] = ""
|
|
# 发起初始化请求
|
|
response = await self.request(
|
|
"https://api.twitter.com/1.1/guest/activate.json",
|
|
method="POST",
|
|
)
|
|
# 获取guest - token
|
|
guest_token = response.json()["guest_token"]
|
|
self.headers["x-guest-token"] = guest_token
|
|
self.cookie.set("gt", guest_token, domain=".twitter.com")
|
|
# 发起第二个初始化请求, 获取_twitter_sess
|
|
await self.request(
|
|
"https://twitter.com/i/js_inst",
|
|
method="GET",
|
|
params={"c_name": "ui_metrics"},
|
|
)
|
|
|
|
async def func(self, url: str, method: str = "GET", **kwargs):
|
|
if not self.inited:
|
|
await self.reset_session()
|
|
self.inited = True
|
|
response = await self.request(
|
|
url,
|
|
method=method,
|
|
**kwargs,
|
|
)
|
|
if response.status_code == 403:
|
|
await self.reset_session()
|
|
response = await self.request(
|
|
url,
|
|
method=method,
|
|
**kwargs,
|
|
)
|
|
if response.status_code != 200:
|
|
raise TwitterError(response.status_code, response.text)
|
|
csrf_token = response.cookies.get("ct0")
|
|
if csrf_token:
|
|
self.headers["x-csrf-token"] = csrf_token
|
|
return response
|
|
|
|
async def pagination_tweets(
|
|
self,
|
|
endpoint: str,
|
|
variables: Dict[str, Any],
|
|
path: List[str],
|
|
):
|
|
_variables = self._variables.copy()
|
|
_variables.update(variables)
|
|
data = await self.func(
|
|
f"https://twitter.com/i/api{endpoint}",
|
|
params={
|
|
"variables": json.dumps(_variables),
|
|
"features": json.dumps(self._features["tweets"]),
|
|
},
|
|
)
|
|
if not path:
|
|
instructions = data.json()["data"]["user"]["result"]["timeline"][
|
|
"timeline"
|
|
]["instructions"]
|
|
else:
|
|
instructions = data.json()["data"]
|
|
for key in path:
|
|
_instructions = instructions.get(key)
|
|
if _instructions is not None:
|
|
instructions = _instructions
|
|
break
|
|
instructions = instructions["instructions"]
|
|
for i in instructions:
|
|
if i["type"] == "TimelineAddEntries":
|
|
return i["entries"]
|
|
|
|
@staticmethod
|
|
def gather_legacy_from_data(entries: List[Dict], filters: str = "tweet-"):
|
|
tweets: List[Tweet] = []
|
|
filter_entries = []
|
|
for entry in entries:
|
|
entry_id: str = entry.get("entryId")
|
|
if entry_id:
|
|
if filters == "none":
|
|
if entry_id.startswith("tweet-"):
|
|
filter_entries.append(entry)
|
|
elif entry_id.startswith(
|
|
"homeConversation-"
|
|
) or entry_id.startswith("conversationthread-"):
|
|
filter_entries.extend(entry["content"]["items"])
|
|
else:
|
|
if entry_id.startswith(filters):
|
|
filter_entries.append(entry)
|
|
for entry in filter_entries:
|
|
retweet = None
|
|
quoted = None
|
|
content = entry.get("content") or entry.get("item")
|
|
tweet = (
|
|
content.get("itemContent", {})
|
|
.get("tweet_results", {})
|
|
.get("result", {})
|
|
)
|
|
if tweet and tweet.get("tweet"):
|
|
tweet = tweet["tweet"]
|
|
if tweet:
|
|
retweet = tweet.get("retweeted_status_result", {}).get("result")
|
|
quoted = tweet.get("quoted_status_result", {}).get("result")
|
|
if (not tweet) or (not tweet.get("legacy")):
|
|
continue
|
|
if retweet and not retweet.get("legacy"):
|
|
retweet = None
|
|
if quoted and not quoted.get("legacy"):
|
|
quoted = None
|
|
tweet = Tweet(
|
|
**tweet["legacy"],
|
|
**{"user": tweet["core"]["user_results"]["result"]["legacy"]},
|
|
)
|
|
if retweet:
|
|
tweet.retweet = Tweet(
|
|
**retweet["legacy"],
|
|
**{"user": retweet["core"]["user_results"]["result"]["legacy"]},
|
|
)
|
|
if quoted:
|
|
tweet.quoted = Tweet(
|
|
**quoted["legacy"],
|
|
**{"user": quoted["core"]["user_results"]["result"]["legacy"]},
|
|
)
|
|
tweets.append(tweet)
|
|
return tweets
|
|
|
|
async def tweet_detail(self, tid: int):
|
|
data = await self.pagination_tweets(
|
|
"/graphql/3XDB26fBve-MmjHaWTUZxA/TweetDetail",
|
|
variables={
|
|
"focalTweetId": tid,
|
|
"with_rux_injections": False,
|
|
"withCommunity": True,
|
|
"withQuickPromoteEligibilityTweetFields": True,
|
|
"withBirdwatchNotes": False,
|
|
},
|
|
path=["threaded_conversation_with_injections"],
|
|
)
|
|
return self.gather_legacy_from_data(data, filters="none")
|
|
|
|
async def user_by_screen_name(self, username: str):
|
|
_variables = {"screen_name": username, "withHighlightedLabel": True}
|
|
data = await self.func(
|
|
"https://twitter.com/i/api/graphql/oUZZZ8Oddwxs8Cd3iW3UEA/UserByScreenName",
|
|
params={
|
|
"variables": json.dumps(_variables),
|
|
"features": json.dumps(self._features["user_by_screen_name"]),
|
|
},
|
|
)
|
|
return User(**data.json()["data"]["user"]["result"]["legacy"])
|
|
|
|
|
|
twitter_client = TwitterClient()
|