import contextlib
import os
import re
from io import BytesIO
from typing import List
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from pyrogram.types import InputMediaPhoto, InlineKeyboardMarkup, InlineKeyboardButton, InputMediaDocument, \
InputMediaAnimation, Message
from init import request
class LofterItem:
def __init__(self, url, audio_link, title: str, origin_url, username, name, comment, tags):
self.url = url
self.audio_link = f"https://music.163.com/#/song?id={audio_link}"
self.only_text = url is None
self.file = None
self.origin_url = origin_url
self.username = username
self.text = f"Lofter Status Info\n\n" \
f"{title.strip()}
\n\n" \
f"✍️ {name}\n" \
f"{tags}\n" \
f"{comment}"
async def init(self):
if self.only_text:
return
file = await request.get(self.url, timeout=30)
file = BytesIO(file.content)
file.name = os.path.basename(self.url).split('?')[0]
self.file = file
async def reply_to(self, message: Message, static: bool = False):
if not self.file:
await self.init()
if static:
await message.reply_document(self.file, caption=self.text, quote=True,
reply_markup=lofter_link(self.url, self.origin_url, self.username))
elif self.only_text:
await message.reply_text(self.text, quote=True, disable_web_page_preview=True,
reply_markup=lofter_link(self.audio_link, self.origin_url, self.username))
elif self.file.name.endswith('.gif'):
await message.reply_animation(self.file, caption=self.text, quote=True,
reply_markup=lofter_link(self.url, self.origin_url, self.username))
else:
await message.reply_photo(self.file, caption=self.text, quote=True,
reply_markup=lofter_link(self.url, self.origin_url, self.username))
async def export(self, static: bool = False, first: bool = False):
if not self.file:
await self.init()
if static:
return InputMediaDocument(self.file, caption=self.text if first else None)
elif self.file.name.endswith('.gif'):
return InputMediaAnimation(self.file, caption=self.text if first else None)
else:
return InputMediaPhoto(self.file, caption=self.text if first else None)
async def input_media(img: List[LofterItem], static: bool = False):
return [(await img[ff].export(static, ff == 0)) for ff in range(len(img))]
async def get_loft(url: str) -> List[LofterItem]:
res = await request.get(url)
assert res.status_code == 200
username, avatar, name, bio, soup = parse_loft_user(url, res.text)
title = soup.findAll("div", {"class": "text"})[-1].getText().strip()
links = soup.findAll("a", {"class": "imgclasstag"})
audio_link = None
audio = soup.find("div", {"class": "img"})
if audio and audio.getText().strip():
title = f"分享音乐:{audio.getText().strip()}\n\n{title}" if title else f"分享音乐:{audio.getText().strip()}"
audio_link = re.findall(r"%26id%3D(.*?)%26", audio.findAll("div")[1].get("onclick"))[0]
comment = soup.findAll("h3", {"class": "nctitle"})
comment_text = "".join(f"{i.getText()} " for i in comment)
if "(" not in comment_text:
comment_text = ""
tags = soup.find("meta", {"name": "Keywords"}).get("content")
tags_text = "".join(f"#{i} " for i in tags.split(","))
return [LofterItem(i.get("bigimgsrc"), audio_link, title, url, username, name, comment_text, tags_text)
for i in links] if links else [
LofterItem(None, audio_link, title, url, username, name, comment_text, tags_text)]
def parse_loft_user(url: str, content: str):
username = urlparse(url).hostname.split(".")[0]
soup = BeautifulSoup(content, "lxml")
name, bio, avatar = "未知用户", "", None # noqa
with contextlib.suppress(Exception):
name = soup.find("title").getText().split("-")[-1].strip()
with contextlib.suppress(Exception):
bio = " - ".join(soup.find("meta", {"name": "Description"}).get("content").split(" - ")[1:])
with contextlib.suppress(Exception):
avatar = soup.find("link", {"rel": "shortcut icon"}).get("href").split("?")[0]
if user := soup.find("div", {"class": "selfinfo"}):
name = user.find("h1").getText()
bio = user.find("div", {"class": "text"}).getText()
return username, avatar, name, bio, soup
if user := soup.find("div", {"class": "g-hdc box"}):
name = user.find("h1").getText()
bio = user.find("p", {"class": "cont"}).getText()
return username, avatar, name, bio, soup
return username, avatar, name, bio, soup
async def get_loft_user(url: str):
res = await request.get(url)
assert res.status_code == 200
username, avatar, name, bio, soup = parse_loft_user(url, res.text)
status_link = None
for i in soup.findAll("a"):
url = i.get("href")
if url and "lofter.com/post/" in url:
status_link = url
break
text = f"Lofter User Info\n\n" \
f"Name: {name}
\n" \
f"Username: {username}\n" \
f"Bio: "
return text, avatar, username, status_link
def lofter_link(url, origin, username):
return InlineKeyboardMarkup([[InlineKeyboardButton(text="Source", url=origin),
InlineKeyboardButton(text="Origin", url=url),
InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com")]]) if url \
else InlineKeyboardMarkup([[InlineKeyboardButton(text="Source", url=origin),
InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com")]])
def lofter_user_link(username, status_link):
return InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com"),
InlineKeyboardButton(text="Status", url=status_link)]]) if status_link else \
InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com")]])