diff --git a/.gitignore b/.gitignore index 68bc17f..3b072b5 100644 --- a/.gitignore +++ b/.gitignore @@ -157,4 +157,7 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -#.idea/ +.idea/ +data/ +bot* +defs/glover.py diff --git a/defs/feed.py b/defs/feed.py new file mode 100644 index 0000000..38c2116 --- /dev/null +++ b/defs/feed.py @@ -0,0 +1,64 @@ +import traceback +from datetime import datetime +from typing import List, Optional + +from bs4 import BeautifulSoup + +from init import request +from defs.glover import rss_hub_host +from defs.models import Tweet, User +from feedparser import parse, FeedParserDict + + +class UsernameNotFound(Exception): + pass + + +async def get(username: str): + url = f"{rss_hub_host}/twitter/user/{username}" + response = await request.get(url) + if response.status_code == 200: + return parse(response.text) + elif response.status_code == 404: + raise UsernameNotFound + else: + return None + + +async def parse_tweets(data: List[FeedParserDict]) -> List[Tweet]: + tweets = [] + for tweet in data: + try: + description = tweet.get("description", "") + soup = BeautifulSoup(description, "lxml") + content = soup.get_text() + img_tag = soup.find_all("img") + images = [img.get("src") for img in img_tag if img.get("src")] + url = tweet.get("link", "") + time = datetime.strptime(tweet.get("published", ""), "%a, %d %b %Y %H:%M:%S %Z") + tweets.append( + Tweet( + content=content, + url=url, + time=time, + images=images + ) + ) + except Exception: + traceback.print_exc() + return tweets + + +async def parse_user(username: str, data: FeedParserDict) -> User: + title = data.get("feed", {}).get("title", "") + name = title.replace("Twitter @", "") + tweets = await parse_tweets(data.get("entries", [])) + return User(username=username, name=name, tweets=tweets) + + +async def get_user(username: str) -> Optional[User]: + data = await get(username) + if data: + return await parse_user(username, data) + else: + return None diff --git a/defs/models.py b/defs/models.py new file mode 100644 index 0000000..15329bc --- /dev/null +++ b/defs/models.py @@ -0,0 +1,33 @@ +from datetime import datetime +from typing import List + +from pydantic import BaseModel + + +class Tweet(BaseModel): + content: str + url: str + time: datetime + images: List[str] + + @property + def id(self) -> int: + return int(self.url.split("/")[-1]) + + @property + def time_str(self) -> str: + return self.time.strftime("%Y-%m-%d %H:%M:%S") + + +class User(BaseModel): + username: str + name: str + tweets: List[Tweet] + + @property + def link(self) -> str: + return f"https://twitter.com/{self.username}" + + @property + def format(self) -> str: + return f'{self.name}' diff --git a/defs/sqlite.py b/defs/sqlite.py new file mode 100644 index 0000000..13855b5 --- /dev/null +++ b/defs/sqlite.py @@ -0,0 +1,49 @@ +from pathlib import Path +from typing import List + +from sqlitedict import SqliteDict + +data_path = Path("data") +data_path.mkdir(exist_ok=True) +db_path = data_path / "data.db" +db = SqliteDict(db_path, autocommit=True) + + +class TweetDB: + prefix = "tweet_" + + @staticmethod + def get_all(username: str) -> List[int]: + return db.get(f"{TweetDB.prefix}{username}", []) + + @staticmethod + def check_id(username: str, tid: int) -> bool: + return tid in TweetDB.get_all(username) + + @staticmethod + def add(username: str, tid: int) -> None: + tweets = TweetDB.get_all(username) + tweets.append(tid) + db[f"{TweetDB.prefix}{username}"] = tweets + + +class UserDB: + @staticmethod + def get_all() -> List[str]: + return db.get("users", []) + + @staticmethod + def check(username: str) -> bool: + return username in UserDB.get_all() + + @staticmethod + def add(username: str) -> None: + users = UserDB.get_all() + users.append(username) + db["users"] = users + + @staticmethod + def remove(username: str) -> None: + users = UserDB.get_all() + users.remove(username) + db["users"] = users diff --git a/defs/update.py b/defs/update.py new file mode 100644 index 0000000..a98be2a --- /dev/null +++ b/defs/update.py @@ -0,0 +1,130 @@ +import asyncio +import traceback +from typing import List + +from pyrogram.enums import ParseMode +from pyrogram.errors import FloodWait +from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, InputMediaPhoto + +from defs.glover import cid, tid, owner +from defs.models import User, Tweet +from init import bot, logs +from defs.sqlite import TweetDB, UserDB +from defs.feed import get_user, UsernameNotFound + + +def get_button(user: User, tweet: Tweet) -> InlineKeyboardMarkup: + return InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton("Source", url=tweet.url), + InlineKeyboardButton("Author", url=user.link), + ] + ] + ) + + +def get_media_group(text: str, tweet: Tweet) -> List[InputMediaPhoto]: + data = [] + images = tweet.images[:10] + for idx, image in enumerate(images): + data.append( + InputMediaPhoto( + image, + caption=text if idx == 0 else None, + parse_mode=ParseMode.HTML, + ) + ) + return data + + +def flood_wait(): + def decorator(function): + async def wrapper(*args, **kwargs): + try: + return await function(*args, **kwargs) + except FloodWait as e: + logs.warning(f"遇到 FloodWait,等待 {e.value} 秒后重试!") + await asyncio.sleep(e.value + 1) + return await wrapper(*args, **kwargs) + except Exception as e: + traceback.format_exc() + raise e + + return wrapper + + return decorator + + +@flood_wait() +async def send_to_user(user: User, tweet: Tweet): + text = "Twitter Timeline Update\n\n" + text += tweet.content + text += f"\n\n{user.format} 发表于 {tweet.time_str}" + if not tweet.images: + return await bot.send_message( + cid, + text, + disable_web_page_preview=True, + reply_to_message_id=tid, + parse_mode=ParseMode.HTML, + reply_markup=get_button(user, tweet), + ) + elif len(tweet.images) == 1: + return await bot.send_photo( + cid, + tweet.images[0], + caption=text, + reply_to_message_id=tid, + parse_mode=ParseMode.HTML, + reply_markup=get_button(user, tweet), + ) + else: + await bot.send_media_group( + cid, + get_media_group(text, tweet), + reply_to_message_id=tid, + ) + + +@flood_wait() +async def send_username_changed(user: str): + text = f"获取 {user} 的数据失败,可能用户名已改变" + await bot.send_message(owner, text) + + +async def send_check(user_data: User): + need_send_tweets = [ + tweet for tweet in user_data.tweets + if not TweetDB.check_id(user_data.username, tweet.id) + ] + logs.info(f"需要推送 {len(need_send_tweets)} 条推文") + for tweet in need_send_tweets: + try: + await send_to_user(user_data, tweet) + except Exception: + logs.error(f"推送 {user_data.name} 的推文 {tweet.id} 失败") + traceback.print_exc() + TweetDB.add(user_data.username, tweet.id) + + +async def check_update(): + logs.info("开始检查更新") + users = UserDB.get_all() + nums = len(users) + for idx, user in enumerate(users): + try: + user_data = await get_user(user) + if user_data: + logs.info(f"获取 {user_data.name} 的数据成功,共 {len(user_data.tweets)} 条推文") + await send_check(user_data) + else: + logs.warning(f"获取 {user} 的数据失败,未知原因") + except UsernameNotFound: + logs.warning(f"获取 {user} 的数据失败,可能用户名已改变") + await send_username_changed(user) + UserDB.remove(user) + except Exception: + traceback.print_exc() + logs.info(f"处理完成,剩余 {nums - idx - 1} 个用户") + logs.info("检查更新完成") diff --git a/init.py b/init.py new file mode 100644 index 0000000..603b0d8 --- /dev/null +++ b/init.py @@ -0,0 +1,29 @@ +import pyrogram +import httpx + +from defs.glover import api_id, api_hash +from scheduler import scheduler +from logging import getLogger, INFO, ERROR, StreamHandler, basicConfig +from coloredlogs import ColoredFormatter + +# Enable logging +logs = getLogger("T2G") +logging_format = "%(levelname)s [%(asctime)s] [%(name)s] %(message)s" +logging_handler = StreamHandler() +logging_handler.setFormatter(ColoredFormatter(logging_format)) +root_logger = getLogger() +root_logger.setLevel(ERROR) +root_logger.addHandler(logging_handler) +basicConfig(level=INFO) +logs.setLevel(INFO) + +if not scheduler.running: + scheduler.start() + +bot = pyrogram.Client( + "bot", api_id=api_id, api_hash=api_hash, plugins=dict(root="plugins") +) +headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/102.0.5005.72 Safari/537.36" +} +request = httpx.AsyncClient(timeout=60.0, headers=headers) diff --git a/main.py b/main.py new file mode 100644 index 0000000..4e96cfb --- /dev/null +++ b/main.py @@ -0,0 +1,11 @@ +from pyrogram import idle + +from init import logs, bot + + +if __name__ == "__main__": + logs.info("连接服务器中。。。") + bot.start() + logs.info("运行成功!") + idle() + bot.stop() diff --git a/plugins/manage_user.py b/plugins/manage_user.py new file mode 100644 index 0000000..e9dac75 --- /dev/null +++ b/plugins/manage_user.py @@ -0,0 +1,67 @@ +import re + +from pyrogram import filters +from pyrogram.types import Message + +from init import bot + +from defs.sqlite import UserDB +from defs.glover import owner + + +@bot.on_message(filters=filters.command("add_user") & filters.user(owner)) +async def add_user(_, message: Message): + try: + username = message.text.split(" ")[1] + except IndexError: + await message.reply("请指定用户名!") + return + if UserDB.check(username): + await message.reply("该用户添加过了!") + return + UserDB.add(username) + await message.reply("添加成功!") + + +@bot.on_message(filters=filters.regex(r"https://twitter.com/(.*)") & filters.chat(owner)) +async def add_user_regex(_, message: Message): + try: + username = re.findall(r"https://twitter.com/(.*)", message.text)[0] + if not username: + raise IndexError + except IndexError: + await message.reply("请指定用户名!") + return + if UserDB.check(username): + await message.reply("该用户添加过了!") + return + UserDB.add(username) + await message.reply(f"添加 {username} 成功!") + + +@bot.on_message(filters=filters.command("del_user") & filters.user(owner)) +async def del_user(_, message: Message): + try: + username = message.text.split(" ")[1] + except IndexError: + await message.reply("请指定用户名!") + return + if not UserDB.check(username): + await message.reply("该用户未添加!") + return + UserDB.remove(username) + await message.reply(f"删除 {username} 成功!") + + +@bot.on_message(filters=filters.command("list_user") & filters.user(owner)) +async def list_user(_, message: Message): + users = UserDB.get_all() + if not users: + await message.reply("列表为空!") + return + + def format_user(username: str): + return f'{username}' + + text = "\n".join([format_user(user) for user in users]) + await message.reply(f"用户列表:\n{text}") diff --git a/plugins/ping.py b/plugins/ping.py new file mode 100644 index 0000000..fc50303 --- /dev/null +++ b/plugins/ping.py @@ -0,0 +1,8 @@ +from init import bot + +from pyrogram import filters + + +@bot.on_message(filters=filters.command("ping")) +async def ping(_, message): + await message.reply("pong") diff --git a/plugins/update.py b/plugins/update.py new file mode 100644 index 0000000..f093360 --- /dev/null +++ b/plugins/update.py @@ -0,0 +1,21 @@ +from pyrogram.types import Message + +from defs.glover import owner +from init import bot +from scheduler import scheduler + +from pyrogram import filters + +from defs.update import check_update + + +@bot.on_message(filters=filters.command("check_update") & filters.user(owner)) +async def update_all(_, message: Message): + msg = await message.reply("开始检查更新!") + await check_update() + await msg.edit("检查更新完毕!") + + +# @scheduler.scheduled_job("cron", minute="*/30", id="update_all") +# async def update_all_30_minutes(): +# await check_update() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..24bc539 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +pyrogram==2.0.106 +tgcrypto==1.2.5 +httpx==0.24.1 +feedparser +beautifulsoup4 +lxml +coloredlogs +apscheduler +sqlitedict +pydantic diff --git a/scheduler.py b/scheduler.py new file mode 100644 index 0000000..148c970 --- /dev/null +++ b/scheduler.py @@ -0,0 +1,5 @@ +from apscheduler.schedulers.asyncio import AsyncIOScheduler + +scheduler = AsyncIOScheduler(timezone="Asia/ShangHai") +if not scheduler.running: + scheduler.start()