mirror of
https://github.com/Xtao-Labs/twitter2telegram.git
synced 2024-11-23 08:20:54 +00:00
132 lines
3.6 KiB
Python
132 lines
3.6 KiB
Python
import traceback
|
|
from asyncio import Lock
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
from bs4 import BeautifulSoup
|
|
from httpx import RequestError
|
|
|
|
from init import request
|
|
from defs.glover import rss_hub_host, nitter_host
|
|
from defs.models import Tweet, User
|
|
from feedparser import parse, FeedParserDict
|
|
|
|
LOCK = Lock()
|
|
|
|
|
|
class UsernameNotFound(Exception):
|
|
pass
|
|
|
|
|
|
class HostNeedChange(Exception):
|
|
pass
|
|
|
|
|
|
def retry(func):
|
|
async def wrapper(*args, **kwargs):
|
|
for i in range(3):
|
|
try:
|
|
return await func(*args, **kwargs)
|
|
except HostNeedChange:
|
|
if i == 2:
|
|
raise HostNeedChange
|
|
continue
|
|
except RequestError:
|
|
continue
|
|
|
|
return wrapper
|
|
|
|
|
|
@retry
|
|
async def rsshub_get(username: str, host: str) -> Optional[FeedParserDict]:
|
|
url = f"{host}/twitter/user/{username}"
|
|
response = await request.get(url)
|
|
if response.status_code == 200:
|
|
return parse(response.text)
|
|
elif response.status_code == 404:
|
|
raise UsernameNotFound
|
|
raise HostNeedChange
|
|
|
|
|
|
@retry
|
|
async def nitter_get(username: str, host: str) -> Optional[FeedParserDict]:
|
|
url = f"{host}/{username}/rss"
|
|
async with LOCK:
|
|
response = await request.get(url)
|
|
if response.status_code == 200:
|
|
return parse(response.text)
|
|
elif response.status_code == 404:
|
|
raise UsernameNotFound
|
|
raise HostNeedChange
|
|
|
|
|
|
async def parse_tweets(data: List[FeedParserDict]) -> List[Tweet]:
|
|
tweets = []
|
|
for tweet in data:
|
|
try:
|
|
description = tweet.get("description", "")
|
|
soup = BeautifulSoup(description, "lxml")
|
|
content = soup.get_text()
|
|
img_tag = soup.find_all("img")
|
|
images = [img.get("src") for img in img_tag if img.get("src")]
|
|
url = tweet.get("link", "")
|
|
time = datetime.strptime(tweet.get("published", ""), "%a, %d %b %Y %H:%M:%S %Z")
|
|
tweets.append(
|
|
Tweet(
|
|
content=content,
|
|
old_url=url,
|
|
time=time,
|
|
images=images
|
|
)
|
|
)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
return tweets
|
|
|
|
|
|
async def parse_user(username: str, data: FeedParserDict) -> User:
|
|
title = data.get("feed", {}).get("title", "")
|
|
name = title.replace("Twitter @", "")
|
|
tweets = await parse_tweets(data.get("entries", []))
|
|
return User(username=username, name=name, tweets=tweets)
|
|
|
|
|
|
async def get_user(username: str) -> Optional[User]:
|
|
data = None
|
|
try:
|
|
data = await get_user_rsshub(username)
|
|
except HostNeedChange:
|
|
pass
|
|
if not data:
|
|
try:
|
|
data = await get_user_nitter(username)
|
|
except HostNeedChange:
|
|
raise UsernameNotFound
|
|
return data
|
|
|
|
|
|
async def get_user_rsshub(username: str) -> Optional[User]:
|
|
for host in rss_hub_host:
|
|
try:
|
|
data = await rsshub_get(username, host)
|
|
if data:
|
|
return await parse_user(username, data)
|
|
except HostNeedChange:
|
|
if host == rss_hub_host[-1]:
|
|
raise HostNeedChange
|
|
continue
|
|
return None
|
|
|
|
|
|
async def get_user_nitter(username: str) -> Optional[User]:
|
|
for host in nitter_host:
|
|
try:
|
|
data = await nitter_get(username, host)
|
|
if data:
|
|
return await parse_user(username, data)
|
|
except HostNeedChange:
|
|
if host == nitter_host[-1]:
|
|
raise HostNeedChange
|
|
continue
|
|
return None
|