diff --git a/defs/bilibili.py b/defs/bilibili.py
index f856f93..cf9b536 100644
--- a/defs/bilibili.py
+++ b/defs/bilibili.py
@@ -64,7 +64,7 @@ def cut_text(old_str, cut):
next_str = next_str[1:]
elif s == "\n":
str_list.append(next_str[: i - 1])
- next_str = next_str[i - 1:]
+ next_str = next_str[i - 1 :]
si = 0
i = 0
continue
@@ -173,9 +173,7 @@ async def binfo_image_create(video_info: dict):
bg_y += title_bg_y
# 简介
- dynamic = (
- "该视频没有简介" if video_info["desc"] == "" else video_info["desc"]
- )
+ dynamic = "该视频没有简介" if video_info["desc"] == "" else video_info["desc"]
dynamic_font = ImageFont.truetype(
f"resources{sep}font{sep}sarasa-mono-sc-semibold.ttf", 18
)
diff --git a/defs/splash.py b/defs/splash.py
index 72c182f..48fd461 100644
--- a/defs/splash.py
+++ b/defs/splash.py
@@ -59,6 +59,7 @@ def retry(func):
logger.warning(f"Sleeping for {e.value}s")
await asyncio.sleep(e.value + 1)
return await func(*args, **kwargs)
+
return wrapper
diff --git a/defs/twitter_api.py b/defs/twitter_api.py
new file mode 100644
index 0000000..5447984
--- /dev/null
+++ b/defs/twitter_api.py
@@ -0,0 +1,136 @@
+from typing import Optional, List
+
+from pyrogram.enums import ParseMode
+from pyrogram.types import (
+ InlineKeyboardMarkup,
+ InlineKeyboardButton,
+ InputMediaPhoto,
+ InputMediaVideo,
+ InputMediaAnimation,
+)
+
+from init import logs
+from models.apis.twitter.client import twitter_client, TwitterError
+from models.apis.twitter.model import User, Tweet, MediaItem
+
+
+def twitter_link(tweet: Tweet):
+ origin = tweet.retweet_or_quoted
+ button = [
+ [
+ InlineKeyboardButton(
+ text="Source",
+ url=tweet.url,
+ ),
+ InlineKeyboardButton(text="Author", url=tweet.user.url),
+ ]
+ ]
+ if origin:
+ button[0].insert(1, InlineKeyboardButton(text="RSource", url=origin.url))
+ return InlineKeyboardMarkup(button)
+
+
+def twitter_user_link(user: User):
+ return InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=user.url)]])
+
+
+def twitter_medias(tweet: Tweet):
+ tweet_media_lists = []
+ if tweet.extended_entities:
+ tweet_media_lists.extend(tweet.extended_entities.media)
+ if tweet.retweet and tweet.retweet.extended_entities:
+ tweet_media_lists.extend(tweet.retweet.extended_entities.media)
+ if tweet.quoted and tweet.quoted.extended_entities:
+ tweet_media_lists.extend(tweet.quoted.extended_entities.media)
+ return tweet_media_lists
+
+
+def twitter_media(tweet_media_lists: List[MediaItem], text: str):
+ media_lists = []
+ for idx, media in enumerate(tweet_media_lists):
+ if len(media_lists) >= 10:
+ break
+ if media.type == "photo":
+ media_lists.append(
+ InputMediaPhoto(
+ media.media_url,
+ caption=text if idx == 0 else None,
+ parse_mode=ParseMode.HTML,
+ )
+ )
+ elif media.type == "gif":
+ media_lists.append(
+ InputMediaAnimation(
+ media.media_url,
+ caption=text if idx == 0 else None,
+ parse_mode=ParseMode.HTML,
+ )
+ )
+ else:
+ media_lists.append(
+ InputMediaVideo(
+ media.media_url,
+ caption=text if idx == 0 else None,
+ parse_mode=ParseMode.HTML,
+ )
+ )
+ return media_lists
+
+
+def get_twitter_user(user: User):
+ user_name = user.name
+ user_username = user.screen_name
+ verified = "💎" if user.verified else ""
+ protected = "🔒" if user.protected else ""
+ text = (
+ f"Twitter User Info\n\n"
+ f"Name: {verified}{protected}{user_name}
\n"
+ f'Username: @{user_username}\n'
+ f"Bio: {user.description}
\n"
+ f"Joined: {user.created.strftime('%Y-%m-%d %H:%M:%S')}
\n"
+ f"📤 {user.statuses_count} ❤️{user.favourites_count} "
+ f"粉丝 {user.followers_count} 关注 {user.friends_count}"
+ )
+ return text
+
+
+def get_twitter_status(tweet: Tweet):
+ text = tweet.full_text or "暂 无 内 容"
+ text = f"{text}
"
+ final_text = "Twitter Status Info\n\n" f"{text}\n\n"
+ if tweet.retweet_or_quoted:
+ roq = tweet.retweet_or_quoted
+ final_text += (
+ f'RT: {roq.full_text or "暂 无 内 容"}
\n\n'
+ f'{roq.user.one_line} 发表于 {roq.created.strftime("%Y-%m-%d %H:%M:%S")}'
+ f"\n👍 {roq.favorite_count} 🔁 {roq.retweet_count}\n"
+ f'{tweet.user.one_line} 转于 {tweet.created.strftime("%Y-%m-%d %H:%M:%S")}\n'
+ f"👍 {tweet.favorite_count} 🔁 {tweet.retweet_count}"
+ )
+ else:
+ final_text += (
+ f'{tweet.user.one_line} 发表于 {tweet.created.strftime("%Y-%m-%d %H:%M:%S")}'
+ f"\n👍 {tweet.favorite_count} 🔁 {tweet.retweet_count}"
+ )
+ return final_text
+
+
+async def fetch_tweet(tweet_id: int) -> Optional[Tweet]:
+ try:
+ tweet = await twitter_client.tweet_detail(tweet_id)
+ for t in tweet:
+ if t.id_str == str(tweet_id):
+ return t
+ return tweet[0]
+ except TwitterError as e:
+ logs.error(f"Twitter Error: {e}")
+ return None
+
+
+async def fetch_user(username: str) -> Optional[User]:
+ try:
+ user = await twitter_client.user_by_screen_name(username)
+ except TwitterError as e:
+ logs.error(f"Twitter Error: {e}")
+ return None
+ return user
diff --git a/models/__init__.py b/models/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/models/apis/__init__.py b/models/apis/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/models/apis/splash.py b/models/apis/splash.py
index 363363e..8348c53 100644
--- a/models/apis/splash.py
+++ b/models/apis/splash.py
@@ -53,14 +53,18 @@ class Splash(BaseModel):
return ""
if not self.game_short_name:
return ""
- return f"https://www.miyoushe.com/{self.game_short_name}/article/{self.article_id}"
+ return (
+ f"https://www.miyoushe.com/{self.game_short_name}/article/{self.article_id}"
+ )
@property
def text(self) -> str:
- return f"#id{self.id} \n" \
- f"ID:{self.id}
\n" \
- f"所属分区:{self.game_id} - {self.game_short_name}
\n" \
- f"开始时间:{self.online_time_str}
\n" \
- f"结束时间:{self.offline_time_str}
\n" \
- f"链接: {self.splash_image}\n" \
- f"文章链接: {self.article_url}"
+ return (
+ f"#id{self.id} \n"
+ f"ID:{self.id}
\n"
+ f"所属分区:{self.game_id} - {self.game_short_name}
\n"
+ f"开始时间:{self.online_time_str}
\n"
+ f"结束时间:{self.offline_time_str}
\n"
+ f"链接: {self.splash_image}\n"
+ f"文章链接: {self.article_url}"
+ )
diff --git a/models/apis/twitter/__init__.py b/models/apis/twitter/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/models/apis/twitter/client.py b/models/apis/twitter/client.py
new file mode 100644
index 0000000..5afd434
--- /dev/null
+++ b/models/apis/twitter/client.py
@@ -0,0 +1,259 @@
+import base64
+import json
+import random
+from typing import Dict, Any, List
+
+from httpx import AsyncClient, Cookies
+
+from .model import Tweet, User
+
+
+class TwitterError(Exception):
+ def __init__(self, status_code: int, message: str):
+ self.status_code = status_code
+ self.message = message
+
+
+class TwitterClient:
+ headers = {
+ "authorization": "",
+ "x-guest-token": "",
+ "x-twitter-auth-type": "",
+ "x-twitter-client-language": "en",
+ "x-twitter-active-user": "yes",
+ "x-csrf-token": "",
+ "Referer": "https://twitter.com/",
+ }
+ tokens = ["CjulERsDeqhhjSme66ECg:IQWdVyqFxghAtURHGeGiWAsmCAGmdW3WmbEx6Hck"]
+
+ _variables = {
+ "count": 20,
+ "includePromotedContent": False,
+ "withSuperFollowsUserFields": True,
+ "withBirdwatchPivots": False,
+ "withDownvotePerspective": False,
+ "withReactionsMetadata": False,
+ "withReactionsPerspective": False,
+ "withSuperFollowsTweetFields": True,
+ "withClientEventToken": False,
+ "withBirdwatchNotes": False,
+ "withVoice": True,
+ "withV2Timeline": False,
+ "__fs_interactive_text": False,
+ "__fs_dont_mention_me_view_api_enabled": False,
+ }
+ _features = {
+ "tweets": {
+ "rweb_lists_timeline_redesign_enabled": True,
+ "responsive_web_graphql_exclude_directive_enabled": True,
+ "verified_phone_label_enabled": False,
+ "creator_subscriptions_tweet_preview_api_enabled": True,
+ "responsive_web_graphql_timeline_navigation_enabled": True,
+ "responsive_web_graphql_skip_user_profile_image_extensions_enabled": False,
+ "tweetypie_unmention_optimization_enabled": True,
+ "responsive_web_edit_tweet_api_enabled": True,
+ "graphql_is_translatable_rweb_tweet_is_translatable_enabled": True,
+ "view_counts_everywhere_api_enabled": True,
+ "longform_notetweets_consumption_enabled": True,
+ "responsive_web_twitter_article_tweet_consumption_enabled": False,
+ "tweet_awards_web_tipping_enabled": False,
+ "freedom_of_speech_not_reach_fetch_enabled": True,
+ "standardized_nudges_misinfo": True,
+ "tweet_with_visibility_results_prefer_gql_limited_actions_policy_enabled": True,
+ "longform_notetweets_rich_text_read_enabled": True,
+ "longform_notetweets_inline_media_enabled": True,
+ "responsive_web_media_download_video_enabled": False,
+ "responsive_web_enhance_cards_enabled": False,
+ },
+ "user_by_screen_name": {
+ "hidden_profile_likes_enabled": False,
+ "responsive_web_graphql_exclude_directive_enabled": True,
+ "verified_phone_label_enabled": False,
+ "subscriptions_verification_info_verified_since_enabled": True,
+ "highlights_tweets_tab_ui_enabled": True,
+ "creator_subscriptions_tweet_preview_api_enabled": True,
+ "responsive_web_graphql_skip_user_profile_image_extensions_enabled": False,
+ "responsive_web_graphql_timeline_navigation_enabled": True,
+ },
+ }
+
+ def __init__(self):
+ self.cookie = Cookies()
+ self.client = AsyncClient(cookies=self.cookie, timeout=60.0)
+ self.inited = False
+
+ async def request(
+ self, url: str, method: str = "GET", headers: Dict[str, str] = None, **kwargs
+ ):
+ headers = headers or self.headers
+ headers = {key: value for key, value in headers.items() if value is not None}
+ return await self.client.request(method, url, headers=headers, **kwargs)
+
+ async def reset_session(self):
+ self.headers[
+ "authorization"
+ ] = f"Basic {base64.b64encode(self.tokens[0].encode()).decode()}"
+ response = await self.request(
+ "https://api.twitter.com/oauth2/token",
+ method="POST",
+ params={"grant_type": "client_credentials"},
+ )
+ access_token = response.json()["access_token"]
+ self.headers["authorization"] = f"Bearer {access_token}"
+ # 生成csrf - token
+ # 32个随机十六进制字符
+ csrf_token = "".join([random.choice("0123456789abcdef") for _ in range(32)])
+ self.cookie.set("ct0", csrf_token, domain=".twitter.com")
+ self.headers["x-csrf-token"] = csrf_token
+ self.headers["x-guest-token"] = ""
+ # 发起初始化请求
+ response = await self.request(
+ "https://api.twitter.com/1.1/guest/activate.json",
+ method="POST",
+ )
+ # 获取guest - token
+ guest_token = response.json()["guest_token"]
+ self.headers["x-guest-token"] = guest_token
+ self.cookie.set("gt", guest_token, domain=".twitter.com")
+ # 发起第二个初始化请求, 获取_twitter_sess
+ await self.request(
+ "https://twitter.com/i/js_inst",
+ method="GET",
+ params={"c_name": "ui_metrics"},
+ )
+
+ async def func(self, url: str, method: str = "GET", **kwargs):
+ if not self.inited:
+ await self.reset_session()
+ self.inited = True
+ response = await self.request(
+ url,
+ method=method,
+ **kwargs,
+ )
+ if response.status_code == 403:
+ await self.reset_session()
+ response = await self.request(
+ url,
+ method=method,
+ **kwargs,
+ )
+ if response.status_code != 200:
+ raise TwitterError(response.status_code, response.text)
+ csrf_token = response.cookies.get("ct0")
+ if csrf_token:
+ self.headers["x-csrf-token"] = csrf_token
+ return response
+
+ async def pagination_tweets(
+ self,
+ endpoint: str,
+ variables: Dict[str, Any],
+ path: List[str],
+ ):
+ _variables = self._variables.copy()
+ _variables.update(variables)
+ data = await self.func(
+ f"https://twitter.com/i/api{endpoint}",
+ params={
+ "variables": json.dumps(_variables),
+ "features": json.dumps(self._features["tweets"]),
+ },
+ )
+ if not path:
+ instructions = data.json()["data"]["user"]["result"]["timeline"][
+ "timeline"
+ ]["instructions"]
+ else:
+ instructions = data.json()["data"]
+ for key in path:
+ _instructions = instructions.get(key)
+ if _instructions is not None:
+ instructions = _instructions
+ break
+ instructions = instructions["instructions"]
+ for i in instructions:
+ if i["type"] == "TimelineAddEntries":
+ return i["entries"]
+
+ @staticmethod
+ def gather_legacy_from_data(entries: List[Dict], filters: str = "tweet-"):
+ tweets: List[Tweet] = []
+ filter_entries = []
+ for entry in entries:
+ entry_id: str = entry.get("entryId")
+ if entry_id:
+ if filters == "none":
+ if entry_id.startswith("tweet-"):
+ filter_entries.append(entry)
+ elif entry_id.startswith(
+ "homeConversation-"
+ ) or entry_id.startswith("conversationthread-"):
+ filter_entries.extend(entry["content"]["items"])
+ else:
+ if entry_id.startswith(filters):
+ filter_entries.append(entry)
+ for entry in filter_entries:
+ retweet = None
+ quoted = None
+ content = entry.get("content") or entry.get("item")
+ tweet = (
+ content.get("itemContent", {})
+ .get("tweet_results", {})
+ .get("result", {})
+ )
+ if tweet and tweet.get("tweet"):
+ tweet = tweet["tweet"]
+ if tweet:
+ retweet = tweet.get("retweeted_status_result", {}).get("result")
+ quoted = tweet.get("quoted_status_result", {}).get("result")
+ if (not tweet) or (not tweet.get("legacy")):
+ continue
+ if retweet and not retweet.get("legacy"):
+ retweet = None
+ if quoted and not quoted.get("legacy"):
+ quoted = None
+ tweet = Tweet(
+ **tweet["legacy"],
+ **{"user": tweet["core"]["user_results"]["result"]["legacy"]},
+ )
+ if retweet:
+ tweet.retweet = Tweet(
+ **retweet["legacy"],
+ **{"user": retweet["core"]["user_results"]["result"]["legacy"]},
+ )
+ if quoted:
+ tweet.quoted = Tweet(
+ **quoted["legacy"],
+ **{"user": quoted["core"]["user_results"]["result"]["legacy"]},
+ )
+ tweets.append(tweet)
+ return tweets
+
+ async def tweet_detail(self, tid: int):
+ data = await self.pagination_tweets(
+ "/graphql/3XDB26fBve-MmjHaWTUZxA/TweetDetail",
+ variables={
+ "focalTweetId": tid,
+ "with_rux_injections": False,
+ "withCommunity": True,
+ "withQuickPromoteEligibilityTweetFields": True,
+ "withBirdwatchNotes": False,
+ },
+ path=["threaded_conversation_with_injections"],
+ )
+ return self.gather_legacy_from_data(data, filters="none")
+
+ async def user_by_screen_name(self, username: str):
+ _variables = {"screen_name": username, "withHighlightedLabel": True}
+ data = await self.func(
+ "https://twitter.com/i/api/graphql/oUZZZ8Oddwxs8Cd3iW3UEA/UserByScreenName",
+ params={
+ "variables": json.dumps(_variables),
+ "features": json.dumps(self._features["user_by_screen_name"]),
+ },
+ )
+ return User(**data.json()["data"]["user"]["result"]["legacy"])
+
+
+twitter_client = TwitterClient()
diff --git a/models/apis/twitter/model.py b/models/apis/twitter/model.py
new file mode 100644
index 0000000..3cdd6ab
--- /dev/null
+++ b/models/apis/twitter/model.py
@@ -0,0 +1,127 @@
+from datetime import datetime, timedelta
+from typing import List, Optional
+
+from pydantic import BaseModel
+
+
+class VideoInfoVariant(BaseModel):
+ bitrate: Optional[int]
+ content_type: str
+ url: str
+
+
+class VideoInfo(BaseModel):
+ variants: List[VideoInfoVariant]
+
+ @property
+ def best_variant(self) -> VideoInfoVariant:
+ variants = [
+ i for i in self.variants if i.content_type.startswith("video/mp4")
+ ] or self.variants
+ return max(variants, key=lambda x: x.bitrate or 0)
+
+
+class MediaItem(BaseModel):
+ display_url: str
+ expanded_url: str
+ id_str: str
+ media_url_https: str
+ """ 真实链接 """
+ type: str
+ """ 类型 """
+ url: str
+ """ 短链接 """
+ video_info: Optional[VideoInfo]
+ """ 视频信息 """
+
+ @property
+ def media_url(self):
+ if self.type == "photo":
+ ext = self.media_url_https.split(".")[-1]
+ return f"{self.media_url_https[:-(len(ext) + 1)]}?format={ext}&name=orig"
+ elif self.type == "video":
+ return self.video_info.best_variant.url
+ return self.media_url_https
+
+
+class ExtendedEntities(BaseModel):
+ media: Optional[List[MediaItem]]
+
+
+class User(BaseModel):
+ created_at: str
+ description: str
+ statuses_count: int = 0
+ favourites_count: int = 0
+ followers_count: int = 0
+ """ 关注者 """
+ friends_count: int = 0
+ """ 正在关注 """
+ location: str
+ name: str
+ profile_banner_url: Optional[str]
+ profile_image_url_https: str
+ screen_name: str
+ verified: bool = False
+ protected: bool = False
+
+ @property
+ def created(self) -> datetime:
+ """入推时间"""
+ # 'Fri Jun 30 12:08:56 +0000 2023'
+ return datetime.strptime(
+ self.created_at, "%a %b %d %H:%M:%S %z %Y"
+ ) + timedelta(hours=8)
+
+ @property
+ def icon(self) -> str:
+ """头像"""
+ return self.profile_image_url_https.replace("_normal.jpg", ".jpg")
+
+ @property
+ def url(self) -> str:
+ """链接"""
+ return f"https://twitter.com/{self.screen_name}"
+
+ @property
+ def one_line(self) -> str:
+ verified = "" if not self.verified else "💎"
+ protected = "" if not self.protected else "🔒"
+ return f'{verified}{protected}{self.screen_name}'
+
+
+class Tweet(BaseModel):
+ bookmark_count: int
+ """ 书签次数 """
+ created_at: str
+ conversation_id_str: str
+ extended_entities: Optional[ExtendedEntities]
+ favorite_count: int
+ """ 喜欢次数 """
+ full_text: str
+ quote_count: int
+ """ 引用 """
+ reply_count: int
+ """ 回复 """
+ retweet_count: int
+ """ 转推 """
+ id_str: str
+ """ tweet id """
+ user: User
+ retweet: "Tweet" = None
+ quoted: "Tweet" = None
+
+ @property
+ def created(self) -> datetime:
+ # 'Fri Jun 30 12:08:56 +0000 2023'
+ return datetime.strptime(
+ self.created_at, "%a %b %d %H:%M:%S %z %Y"
+ ) + timedelta(hours=8)
+
+ @property
+ def url(self) -> str:
+ return f"https://twitter.com/{self.user.screen_name}/status/{self.id_str}"
+
+ @property
+ def retweet_or_quoted(self) -> "Tweet":
+ return self.retweet or self.quoted
diff --git a/modules/banme.py b/modules/banme.py
index 4e603ff..e7dbf64 100644
--- a/modules/banme.py
+++ b/modules/banme.py
@@ -5,18 +5,20 @@ import random
from datetime import datetime, timedelta
from pyrogram import Client, filters
from pyrogram.enums import ChatMemberStatus
-from pyrogram.types import Message, ChatPermissions, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
+from pyrogram.types import (
+ Message,
+ ChatPermissions,
+ CallbackQuery,
+ InlineKeyboardMarkup,
+ InlineKeyboardButton,
+)
from init import bot
from scheduler import reply_message
def gen_cancel_button(uid: int):
return InlineKeyboardMarkup(
- [
- [
- InlineKeyboardButton(text="别口球我!", callback_data=f"banme_cancel_{uid}")
- ]
- ]
+ [[InlineKeyboardButton(text="别口球我!", callback_data=f"banme_cancel_{uid}")]]
)
@@ -37,7 +39,7 @@ async def ban_me_command(client: Client, message: Message):
# 检查bot和用户身份
if (
- await client.get_chat_member(message.chat.id, "self")
+ await client.get_chat_member(message.chat.id, "self")
).status != ChatMemberStatus.ADMINISTRATOR:
await message.reply("Bot非群管理员, 无法执行禁言操作QAQ")
return
@@ -64,12 +66,12 @@ async def ban_me_command(client: Client, message: Message):
ChatPermissions(),
datetime.now() + timedelta(seconds=act_time),
)
- await reply_message(message, msg, reply_markup=gen_cancel_button(message.from_user.id))
+ await reply_message(
+ message, msg, reply_markup=gen_cancel_button(message.from_user.id)
+ )
-@bot.on_callback_query(
- filters.regex(r"^banme_cancel_(\d+)$")
-)
+@bot.on_callback_query(filters.regex(r"^banme_cancel_(\d+)$"))
async def ban_me_cancel(client: Client, callback_query: CallbackQuery):
if not callback_query.from_user:
return
diff --git a/modules/geo.py b/modules/geo.py
index 6195690..cd40982 100644
--- a/modules/geo.py
+++ b/modules/geo.py
@@ -10,9 +10,7 @@ from init import request, bot
REQUEST_URL = f"https://restapi.amap.com/v3/geocode/geo?key={amap_key}&"
-@bot.on_message(
- filters.incoming & filters.command(["geo", f"geo@{bot.me.username}"])
-)
+@bot.on_message(filters.incoming & filters.command(["geo", f"geo@{bot.me.username}"]))
async def geo_command(_: Client, message: Message):
if len(message.command) <= 1:
await message.reply("没有找到要查询的中国 经纬度/地址 ...")
diff --git a/modules/start.py b/modules/start.py
index 9273513..4126ea4 100644
--- a/modules/start.py
+++ b/modules/start.py
@@ -1,5 +1,10 @@
from pyrogram import Client, filters
-from pyrogram.types import Message, InlineQuery, InlineQueryResultArticle, InputTextMessageContent
+from pyrogram.types import (
+ Message,
+ InlineQuery,
+ InlineQueryResultArticle,
+ InputTextMessageContent,
+)
from defs.button import gen_button, Button
from init import bot
diff --git a/modules/twitter_api.py b/modules/twitter_api.py
new file mode 100644
index 0000000..0d041af
--- /dev/null
+++ b/modules/twitter_api.py
@@ -0,0 +1,118 @@
+from urllib.parse import urlparse
+
+from pyrogram import Client, filters, ContinuePropagation
+from pyrogram.enums import MessageEntityType
+from pyrogram.types import Message
+
+from defs.twitter_api import (
+ fetch_tweet,
+ get_twitter_status,
+ twitter_link,
+ twitter_media,
+ fetch_user,
+ get_twitter_user,
+ twitter_user_link,
+ twitter_medias,
+)
+from models.apis.twitter.model import MediaItem
+
+
+async def send_single_tweet(message: Message, media: MediaItem, text: str, button):
+ if media.type == "photo":
+ await message.reply_photo(
+ media.media_url,
+ quote=True,
+ caption=text,
+ reply_markup=button,
+ )
+ elif media.type == "video":
+ await message.reply_video(
+ media.media_url,
+ quote=True,
+ caption=text,
+ reply_markup=button,
+ )
+ elif media.type == "gif":
+ await message.reply_animation(
+ media.media_url,
+ quote=True,
+ caption=text,
+ reply_markup=button,
+ )
+ else:
+ await message.reply_document(
+ media.media_url,
+ quote=True,
+ caption=text,
+ reply_markup=button,
+ )
+
+
+async def process_status(message: Message, status: str):
+ try:
+ status = int(status)
+ except ValueError:
+ return
+ tweet = await fetch_tweet(status)
+ if not tweet:
+ return
+ text = get_twitter_status(tweet)
+ button = twitter_link(tweet)
+ medias = twitter_medias(tweet)
+ if len(medias) == 1:
+ media = medias[0]
+ await send_single_tweet(message, media, text, button)
+ return
+ media_lists = twitter_media(medias, text)
+ if media_lists:
+ await message.reply_media_group(media_lists, quote=True)
+ else:
+ await message.reply(text, quote=True, reply_markup=button)
+
+
+async def process_user(message: Message, username: str):
+ user = await fetch_user(username)
+ if not user:
+ return
+ text = get_twitter_user(user)
+ button = twitter_user_link(user)
+ await message.reply_photo(
+ user.icon,
+ caption=text,
+ quote=True,
+ reply_markup=button,
+ )
+
+
+@Client.on_message(filters.incoming & filters.text & filters.regex(r"twitter.com/"))
+async def twitter_share(_: Client, message: Message):
+ if not message.text:
+ return
+ for num in range(len(message.entities)):
+ entity = message.entities[num]
+ if entity.type == MessageEntityType.URL:
+ url = message.text[entity.offset : entity.offset + entity.length]
+ elif entity.type == MessageEntityType.TEXT_LINK:
+ url = entity.url
+ else:
+ continue
+ url = urlparse(url)
+ if url.hostname and url.hostname in ["twitter.com", "vxtwitter.com"]:
+ if url.path.find("status") >= 0:
+ status_id = str(
+ url.path[url.path.find("status") + 7 :].split("/")[0]
+ ).split("?")[0]
+ try:
+ await process_status(message, status_id)
+ except Exception as e:
+ print(e)
+ elif url.path == "/":
+ return
+ else:
+ # 解析用户
+ uid = url.path.replace("/", "")
+ try:
+ await process_user(message, uid)
+ except Exception as e:
+ print(e)
+ raise ContinuePropagation
diff --git a/scheduler.py b/scheduler.py
index a61db79..8923c93 100644
--- a/scheduler.py
+++ b/scheduler.py
@@ -30,7 +30,7 @@ def add_delete_message_job(message: Message, delete_seconds: int = 60):
name=f"{message.chat.id}|{message.id}|delete_message",
args=[message],
run_date=datetime.datetime.now(pytz.timezone("Asia/Shanghai"))
- + datetime.timedelta(seconds=delete_seconds),
+ + datetime.timedelta(seconds=delete_seconds),
replace_existing=True,
)
@@ -43,12 +43,14 @@ def add_delete_file_job(path: str, delete_seconds: int = 3600):
name=f"{hash(path)}|delete_file",
args=[path],
run_date=datetime.datetime.now(pytz.timezone("Asia/Shanghai"))
- + datetime.timedelta(seconds=delete_seconds),
+ + datetime.timedelta(seconds=delete_seconds),
replace_existing=True,
)
-async def reply_message(msg: Message, text: str, delete_origin: bool = True, *args, **kwargs):
+async def reply_message(
+ msg: Message, text: str, delete_origin: bool = True, *args, **kwargs
+):
reply_msg = await msg.reply(text, *args, **kwargs)
add_delete_message_job(reply_msg)
if delete_origin: