twitter2telegram/defs/feed.py

132 lines
3.6 KiB
Python

import traceback
from asyncio import Lock
from datetime import datetime
from typing import List, Optional
from bs4 import BeautifulSoup
from httpx import RequestError
from init import request
from defs.glover import rss_hub_host, nitter_host
from defs.models import Tweet, User
from feedparser import parse, FeedParserDict
LOCK = Lock()
class UsernameNotFound(Exception):
pass
class HostNeedChange(Exception):
pass
def retry(func):
async def wrapper(*args, **kwargs):
for i in range(3):
try:
return await func(*args, **kwargs)
except HostNeedChange:
if i == 2:
raise HostNeedChange
continue
except RequestError:
continue
return wrapper
@retry
async def rsshub_get(username: str, host: str) -> Optional[FeedParserDict]:
url = f"{host}/twitter/user/{username}"
response = await request.get(url)
if response.status_code == 200:
return parse(response.text)
elif response.status_code == 404:
raise UsernameNotFound
raise HostNeedChange
@retry
async def nitter_get(username: str, host: str) -> Optional[FeedParserDict]:
url = f"{host}/{username}/rss"
async with LOCK:
response = await request.get(url)
if response.status_code == 200:
return parse(response.text)
elif response.status_code == 404:
raise UsernameNotFound
raise HostNeedChange
async def parse_tweets(data: List[FeedParserDict]) -> List[Tweet]:
tweets = []
for tweet in data:
try:
description = tweet.get("description", "")
soup = BeautifulSoup(description, "lxml")
content = soup.get_text()
img_tag = soup.find_all("img")
images = [img.get("src") for img in img_tag if img.get("src")]
url = tweet.get("link", "")
time = datetime.strptime(tweet.get("published", ""), "%a, %d %b %Y %H:%M:%S %Z")
tweets.append(
Tweet(
content=content,
old_url=url,
time=time,
images=images
)
)
except Exception:
traceback.print_exc()
return tweets
async def parse_user(username: str, data: FeedParserDict) -> User:
title = data.get("feed", {}).get("title", "")
name = title.replace("Twitter @", "")
tweets = await parse_tweets(data.get("entries", []))
return User(username=username, name=name, tweets=tweets)
async def get_user(username: str) -> Optional[User]:
data = None
try:
data = await get_user_rsshub(username)
except HostNeedChange:
pass
if not data:
try:
data = await get_user_nitter(username)
except HostNeedChange:
raise UsernameNotFound
return data
async def get_user_rsshub(username: str) -> Optional[User]:
for host in rss_hub_host:
try:
data = await rsshub_get(username, host)
if data:
return await parse_user(username, data)
except HostNeedChange:
if host == rss_hub_host[-1]:
raise HostNeedChange
continue
return None
async def get_user_nitter(username: str) -> Optional[User]:
for host in nitter_host:
try:
data = await nitter_get(username, host)
if data:
return await parse_user(username, data)
except HostNeedChange:
if host == nitter_host[-1]:
raise HostNeedChange
continue
return None