iShotaBot/modules/twitter_api.py
2023-10-02 16:51:46 +08:00

164 lines
4.4 KiB
Python

import contextlib
from typing import Optional
from urllib.parse import urlparse
from pydantic import BaseModel
from pyrogram import Client, filters, ContinuePropagation
from pyrogram.enums import MessageEntityType, ChatType
from pyrogram.types import Message
from defs.fix_twitter_api import (
fetch_tweet,
get_twitter_status,
twitter_link,
twitter_media,
fetch_user,
get_twitter_user,
twitter_user_link,
twitter_medias,
)
from init import bot
from models.apis.fxtwitter.model import FixTweetMedia
class Reply(BaseModel):
cid: int
mid: Optional[int] = None
async def send_single_tweet(reply: Reply, media: FixTweetMedia, text: str, button):
if media.type == "photo":
await bot.send_photo(
reply.cid,
media.url,
caption=text,
reply_markup=button,
reply_to_message_id=reply.mid,
)
elif media.type == "video":
await bot.send_video(
reply.cid,
media.url,
caption=text,
reply_markup=button,
reply_to_message_id=reply.mid,
)
elif media.type == "gif":
await bot.send_animation(
reply.cid,
media.url,
caption=text,
reply_markup=button,
reply_to_message_id=reply.mid,
)
else:
await bot.reply_document(
reply.cid,
media.url,
caption=text,
reply_markup=button,
reply_to_message_id=reply.mid,
)
async def process_status(reply: Reply, status: str):
try:
status = int(status)
except ValueError:
return
tweet = await fetch_tweet(status)
if not tweet:
return
text = get_twitter_status(tweet)
button = twitter_link(tweet)
medias = twitter_medias(tweet)
if len(medias) == 1:
media = medias[0]
await send_single_tweet(reply, media, text, button)
return
media_lists = twitter_media(medias, text)
if media_lists:
await bot.send_media_group(
reply.cid,
media_lists,
reply_to_message_id=reply.mid,
)
else:
await bot.send_message(
reply.cid,
text,
reply_markup=button,
reply_to_message_id=reply.mid,
)
async def process_user(reply: Reply, username: str):
user = await fetch_user(username)
if not user:
return
text = get_twitter_user(user)
button = twitter_user_link(user)
await bot.send_photo(
reply.cid,
user.icon,
caption=text,
reply_markup=button,
reply_to_message_id=reply.mid,
)
async def process_url(url: str, reply: Reply):
url = urlparse(url)
if url.hostname and url.hostname in [
"twitter.com",
"vxtwitter.com",
"fxtwitter.com",
"x.com",
]:
if url.path.find("status") >= 0:
status_id = str(
url.path[url.path.find("status") + 7 :].split("/")[0]
).split("?")[0]
try:
await process_status(reply, status_id)
except Exception as e:
print(e)
elif url.path == "/":
return
else:
# 解析用户
uid = url.path.replace("/", "")
try:
await process_user(reply, uid)
except Exception as e:
print(e)
@bot.on_message(filters.incoming & filters.text & filters.regex(r"(x|twitter).com/"))
async def twitter_share(_: Client, message: Message):
if not message.text:
return
if (
message.sender_chat
and message.forward_from_chat
and message.sender_chat.id == message.forward_from_chat.id
):
# 过滤绑定频道的转发
return
mid = message.id
if message.text.startswith("del") and message.chat.type == ChatType.CHANNEL:
with contextlib.suppress(Exception):
await message.delete()
mid = None
reply = Reply(cid=message.chat.id, mid=mid)
for num in range(len(message.entities)):
entity = message.entities[num]
if entity.type == MessageEntityType.URL:
url = message.text[entity.offset : entity.offset + entity.length]
elif entity.type == MessageEntityType.TEXT_LINK:
url = entity.url
else:
continue
await process_url(url, reply)
raise ContinuePropagation