twitter2telegram/defs/update.py

173 lines
5.7 KiB
Python
Raw Permalink Normal View History

2023-05-24 15:20:21 +00:00
import asyncio
2023-05-25 12:23:38 +00:00
import time
2023-05-24 15:20:21 +00:00
import traceback
2023-05-25 12:23:38 +00:00
from typing import List, Dict
2023-05-24 15:20:21 +00:00
from pyrogram.enums import ParseMode
from pyrogram.errors import FloodWait
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, InputMediaPhoto
from defs.glover import cid, tid, owner
from defs.models import User, Tweet
from init import bot, logs
from defs.sqlite import TweetDB, UserDB
from defs.feed import get_user, UsernameNotFound
def get_button(user: User, tweet: Tweet) -> InlineKeyboardMarkup:
return InlineKeyboardMarkup(
[
[
InlineKeyboardButton("Source", url=tweet.url),
InlineKeyboardButton("Author", url=user.link),
]
]
)
def get_media_group(text: str, tweet: Tweet) -> List[InputMediaPhoto]:
data = []
images = tweet.images[:10]
for idx, image in enumerate(images):
data.append(
InputMediaPhoto(
image,
caption=text if idx == 0 else None,
parse_mode=ParseMode.HTML,
)
)
return data
def flood_wait():
def decorator(function):
async def wrapper(*args, **kwargs):
try:
return await function(*args, **kwargs)
except FloodWait as e:
logs.warning(f"遇到 FloodWait等待 {e.value} 秒后重试!")
await asyncio.sleep(e.value + 1)
return await wrapper(*args, **kwargs)
except Exception as e:
traceback.format_exc()
raise e
return wrapper
return decorator
@flood_wait()
async def send_to_user(user: User, tweet: Tweet):
text = "<b>Twitter Timeline Update</b>\n\n<code>"
text += tweet.content
text += f"</code>\n\n{user.format} 发表于 {tweet.time_str}"
if not tweet.images:
return await bot.send_message(
cid,
text,
disable_web_page_preview=True,
reply_to_message_id=tid,
parse_mode=ParseMode.HTML,
reply_markup=get_button(user, tweet),
)
elif len(tweet.images) == 1:
return await bot.send_photo(
cid,
tweet.images[0],
caption=text,
reply_to_message_id=tid,
parse_mode=ParseMode.HTML,
reply_markup=get_button(user, tweet),
)
else:
await bot.send_media_group(
cid,
get_media_group(text, tweet),
reply_to_message_id=tid,
)
@flood_wait()
async def send_username_changed(user: str):
2023-05-24 15:39:55 +00:00
text = f"获取 {user} 的数据失败,可能用户名已改变,请考虑移除该用户"
await bot.send_message(owner, text)
2023-12-01 15:48:13 +00:00
async def send_check(user_data: User) -> bool:
2024-01-22 05:57:27 +00:00
need_send_tweets = user_data.tweets
2023-12-01 15:48:13 +00:00
if len(need_send_tweets):
logs.info(f"{user_data.name} (@{user_data.username}) 需要推送 {len(need_send_tweets)} 条推文")
2023-05-24 15:20:21 +00:00
for tweet in need_send_tweets:
try:
await send_to_user(user_data, tweet)
except Exception:
logs.error(f"推送 {user_data.name} 的推文 {tweet.id} 失败")
2024-01-22 05:57:27 +00:00
TweetDB.remove(user_data.username, tweet.id)
2023-12-01 15:48:13 +00:00
return len(need_send_tweets) > 0
2023-05-24 15:20:21 +00:00
2024-01-22 05:57:27 +00:00
def filter_tweets(username: str, data: List[Tweet]) -> List[Tweet]:
tweets = []
for tweet in data:
if not TweetDB.check_id(username, tweet.id):
TweetDB.add(username, tweet.id)
tweets.append(tweet)
return tweets
2023-05-25 12:23:38 +00:00
async def async_get_user(user_data: Dict, username: str) -> None:
try:
data = await get_user(username)
if data:
2024-01-22 05:57:27 +00:00
data.tweets = filter_tweets(username, data.tweets)
2023-05-25 12:23:38 +00:00
user_data[username] = data
else:
user_data[username] = None
logs.warning(f"获取 {username} 的数据失败,未知原因")
except UsernameNotFound:
logs.warning(f"获取 {username} 的数据失败,可能用户名已改变")
2023-12-12 05:40:23 +00:00
user_data[username] = UsernameNotFound()
2023-05-25 12:23:38 +00:00
except Exception:
logs.error(f"获取 {username} 的数据失败")
user_data[username] = None
2023-05-24 15:20:21 +00:00
async def check_update():
logs.info("开始检查更新")
users = UserDB.get_all()
2023-05-25 12:23:38 +00:00
users_data = {user: None for user in users}
tasks = [async_get_user(users_data, user) for user in users]
tasks_count = len(tasks) // 20 + 1 if len(tasks) % 20 else len(tasks) // 20
start_time = time.time()
for idx in range(0, len(tasks), 20):
tasks_group = tasks[idx:idx + 20]
logs.info(f"开始获取第 {idx // 20 + 1} / {tasks_count} 组用户的数据")
await asyncio.gather(*tasks_group)
logs.info(f"获取数据用时 {time.time() - start_time:.2f}")
2023-05-24 15:39:55 +00:00
failed_users = []
2023-05-25 12:23:38 +00:00
nums = len(users_data)
keys = list(users_data.keys())
values = list(users_data.values())
for idx in range(nums):
username = keys[idx]
user_data = values[idx]
2023-12-01 15:48:13 +00:00
check = True
2023-05-25 12:23:38 +00:00
if isinstance(user_data, User):
2023-12-01 15:48:13 +00:00
logs.debug(f"获取 {user_data.name} 的数据成功,共 {len(user_data.tweets)} 条推文")
check = await send_check(user_data)
2023-05-25 12:23:38 +00:00
elif isinstance(user_data, UsernameNotFound):
logs.warning(f"获取 {username} 的数据失败,可能用户名已改变")
failed_users.append(username)
2023-12-12 05:40:23 +00:00
else:
logs.error(f"获取 {username} 的数据失败,未知原因")
failed_users.append(username)
2023-12-01 15:48:13 +00:00
if check:
logs.info(f"处理完成,剩余 {nums - idx - 1} 个用户")
2023-05-24 15:39:55 +00:00
if len(failed_users) > 5:
logs.warning("失效数据过多,可能 API 失效")
# else:
# for user in failed_users:
# await send_username_changed(user)
2023-05-24 15:20:21 +00:00
logs.info("检查更新完成")