import traceback from asyncio import Lock from datetime import datetime from typing import List, Optional from bs4 import BeautifulSoup from httpx import RequestError from init import request from defs.glover import rss_hub_host, nitter_host from defs.models import Tweet, User from feedparser import parse, FeedParserDict LOCK = Lock() class UsernameNotFound(Exception): pass class HostNeedChange(Exception): pass def retry(func): async def wrapper(*args, **kwargs): for i in range(3): try: return await func(*args, **kwargs) except HostNeedChange: if i == 2: raise HostNeedChange continue except RequestError: continue return wrapper @retry async def rsshub_get(username: str, host: str) -> Optional[FeedParserDict]: url = f"{host}/twitter/user/{username}" response = await request.get(url) if response.status_code == 200: return parse(response.text) elif response.status_code == 404: raise UsernameNotFound raise HostNeedChange @retry async def nitter_get(username: str, host: str) -> Optional[FeedParserDict]: url = f"{host}/{username}/rss" async with LOCK: response = await request.get(url) if response.status_code == 200: return parse(response.text) elif response.status_code == 404: raise UsernameNotFound raise HostNeedChange async def parse_tweets(data: List[FeedParserDict]) -> List[Tweet]: tweets = [] for tweet in data: try: description = tweet.get("description", "") soup = BeautifulSoup(description, "lxml") content = soup.get_text() img_tag = soup.find_all("img") images = [img.get("src") for img in img_tag if img.get("src")] url = tweet.get("link", "") time = datetime.strptime(tweet.get("published", ""), "%a, %d %b %Y %H:%M:%S %Z") tweets.append( Tweet( content=content, old_url=url, time=time, images=images ) ) except Exception: traceback.print_exc() return tweets async def parse_user(username: str, data: FeedParserDict) -> User: title = data.get("feed", {}).get("title", "") name = title.replace("Twitter @", "") tweets = await parse_tweets(data.get("entries", [])) return User(username=username, name=name, tweets=tweets) async def get_user(username: str) -> Optional[User]: data = None try: data = await get_user_rsshub(username) except HostNeedChange: pass if not data: try: data = await get_user_nitter(username) except HostNeedChange: raise UsernameNotFound return data async def get_user_rsshub(username: str) -> Optional[User]: for host in rss_hub_host: try: data = await rsshub_get(username, host) if data: return await parse_user(username, data) except HostNeedChange: if host == rss_hub_host[-1]: raise HostNeedChange continue return None async def get_user_nitter(username: str) -> Optional[User]: for host in nitter_host: try: data = await nitter_get(username, host) if data: return await parse_user(username, data) except HostNeedChange: if host == nitter_host[-1]: raise HostNeedChange continue return None