mirror of
https://github.com/Xtao-Labs/iShotaBot.git
synced 2024-11-16 04:35:55 +00:00
⚡️ 支持解析更多类型
This commit is contained in:
parent
99df24cd0e
commit
2bda99e05f
107
defs/lofter.py
107
defs/lofter.py
@ -1,22 +1,34 @@
|
||||
import os
|
||||
import re
|
||||
|
||||
from io import BytesIO
|
||||
from typing import List
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from pyrogram.types import InputMediaPhoto, InlineKeyboardMarkup, InlineKeyboardButton, InputMediaDocument, \
|
||||
InputMediaAnimation, InputMediaVideo, Message
|
||||
|
||||
from init import request, bot
|
||||
from pyrogram.types import InputMediaPhoto, InlineKeyboardMarkup, InlineKeyboardButton, InputMediaDocument, \
|
||||
InputMediaAnimation, Message
|
||||
|
||||
from init import request
|
||||
|
||||
|
||||
class LofterItem:
|
||||
def __init__(self, url, title):
|
||||
def __init__(self, url, audio_link, title: str, origin_url, username, name, comment, tags):
|
||||
self.url = url
|
||||
self.audio_link = f"https://music.163.com/#/song?id={audio_link}"
|
||||
self.only_text = url is None
|
||||
self.file = None
|
||||
self.text = title
|
||||
self.origin_url = origin_url
|
||||
self.username = username
|
||||
self.text = f"<b>Lofter Status Info</b>\n\n" \
|
||||
f"<code>{title.strip()}</code>\n\n" \
|
||||
f"✍️ <a href=\"https://{username}.lofter.com/\">{name}</a>\n" \
|
||||
f"{tags}\n" \
|
||||
f"{comment}"
|
||||
|
||||
async def init(self):
|
||||
if self.only_text:
|
||||
return
|
||||
file = await request.get(self.url, timeout=30)
|
||||
file = BytesIO(file.content)
|
||||
file.name = os.path.basename(self.url).split('?')[0]
|
||||
@ -26,13 +38,17 @@ class LofterItem:
|
||||
if not self.file:
|
||||
await self.init()
|
||||
if static:
|
||||
await message.reply_document(self.file, caption=self.text, quote=True, reply_markup=gen_button(self.url))
|
||||
await message.reply_document(self.file, caption=self.text, quote=True,
|
||||
reply_markup=lofter_link(self.url, self.origin_url, self.username))
|
||||
elif self.only_text:
|
||||
await message.reply_text(self.text, quote=True, disable_web_page_preview=True,
|
||||
reply_markup=lofter_link(self.audio_link, self.origin_url, self.username))
|
||||
elif self.file.name.endswith('.gif'):
|
||||
await message.reply_animation(self.file, caption=self.text, quote=True, reply_markup=gen_button(self.url))
|
||||
elif self.file.name.endswith('.mp4'):
|
||||
await message.reply_video(self.file, caption=self.text, quote=True, reply_markup=gen_button(self.url))
|
||||
await message.reply_animation(self.file, caption=self.text, quote=True,
|
||||
reply_markup=lofter_link(self.url, self.origin_url, self.username))
|
||||
else:
|
||||
await message.reply_photo(self.file, caption=self.text, quote=True, reply_markup=gen_button(self.url))
|
||||
await message.reply_photo(self.file, caption=self.text, quote=True,
|
||||
reply_markup=lofter_link(self.url, self.origin_url, self.username))
|
||||
|
||||
async def export(self, static: bool = False, first: bool = False):
|
||||
if not self.file:
|
||||
@ -41,19 +57,10 @@ class LofterItem:
|
||||
return InputMediaDocument(self.file, caption=self.text if first else None)
|
||||
elif self.file.name.endswith('.gif'):
|
||||
return InputMediaAnimation(self.file, caption=self.text if first else None)
|
||||
elif self.file.name.endswith('.mp4'):
|
||||
return InputMediaVideo(self.file, caption=self.text if first else None)
|
||||
else:
|
||||
return InputMediaPhoto(self.file, caption=self.text if first else None)
|
||||
|
||||
|
||||
def gen_button(url):
|
||||
data = urlparse(url)
|
||||
return InlineKeyboardMarkup([[
|
||||
InlineKeyboardButton(text="Source", url=url),
|
||||
InlineKeyboardButton(text="Author", url=f"https://{data.hostname}")]])
|
||||
|
||||
|
||||
async def input_media(img: List[LofterItem], static: bool = False):
|
||||
return [(await img[ff].export(static, ff == 0)) for ff in range(len(img))]
|
||||
|
||||
@ -61,7 +68,61 @@ async def input_media(img: List[LofterItem], static: bool = False):
|
||||
async def get_loft(url: str) -> List[LofterItem]:
|
||||
res = await request.get(url)
|
||||
assert res.status_code == 200
|
||||
soup = BeautifulSoup(res.text, "lxml")
|
||||
title = soup.findAll("div", {"class": "text"})[-1].getText()
|
||||
username, avatar, name, bio, soup = parse_loft_user(url, res.text)
|
||||
title = soup.findAll("div", {"class": "text"})[-1].getText().strip()
|
||||
links = soup.findAll("a", {"class": "imgclasstag"})
|
||||
return [LofterItem(i.get("bigimgsrc"), title) for i in links]
|
||||
audio_link = None
|
||||
audio = soup.find("div", {"class": "img"})
|
||||
if audio and audio.getText().strip():
|
||||
title = f"分享音乐:{audio.getText().strip()}\n\n{title}" if title else f"分享音乐:{audio.getText().strip()}"
|
||||
audio_link = re.findall(r"%26id%3D(.*?)%26", audio.findAll("div")[1].get("onclick"))[0]
|
||||
comment = soup.findAll("h3", {"class": "nctitle"})
|
||||
comment_text = "".join(f"{i.getText()} " for i in comment)
|
||||
if "(" not in comment_text:
|
||||
comment_text = ""
|
||||
tags = soup.find("meta", {"name": "Keywords"}).get("content")
|
||||
tags_text = "".join(f"#{i} " for i in tags.split(","))
|
||||
return [LofterItem(i.get("bigimgsrc"), audio_link, title, url, username, name, comment_text, tags_text)
|
||||
for i in links] if links else [
|
||||
LofterItem(None, audio_link, title, url, username, name, comment_text, tags_text)]
|
||||
|
||||
|
||||
def parse_loft_user(url: str, content: str):
|
||||
username = urlparse(url).hostname.split(".")[0]
|
||||
soup = BeautifulSoup(content, "lxml")
|
||||
user = soup.find("div", {"class": "selfinfo"})
|
||||
avatar = user.find("img").get("src").split("?")[0]
|
||||
name = user.find("h1").getText()
|
||||
bio = user.find("div", {"class": "text"}).getText()
|
||||
return username, avatar, name, bio, soup
|
||||
|
||||
|
||||
async def get_loft_user(url: str):
|
||||
res = await request.get(url)
|
||||
assert res.status_code == 200
|
||||
username, avatar, name, bio, soup = parse_loft_user(url, res.text)
|
||||
status_link = None
|
||||
for i in soup.findAll("a"):
|
||||
url = i.get("href")
|
||||
if url and "lofter.com/post/" in url:
|
||||
status_link = url
|
||||
break
|
||||
text = f"<b>Lofter User Info</b>\n\n" \
|
||||
f"Name: <code>{name}</code>\n" \
|
||||
f"Username: <a href=\"https://{username}.lofter.com\">{username}</a>\n" \
|
||||
f"Bio: <code{bio}</code>"
|
||||
return text, avatar, username, status_link
|
||||
|
||||
|
||||
def lofter_link(url, origin, username):
|
||||
return InlineKeyboardMarkup([[InlineKeyboardButton(text="Source", url=origin),
|
||||
InlineKeyboardButton(text="Origin", url=url),
|
||||
InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com")]]) if url \
|
||||
else InlineKeyboardMarkup([[InlineKeyboardButton(text="Source", url=origin),
|
||||
InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com")]])
|
||||
|
||||
|
||||
def lofter_user_link(username, status_link):
|
||||
return InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com"),
|
||||
InlineKeyboardButton(text="Status", url=status_link)]]) if status_link else \
|
||||
InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com")]])
|
||||
|
@ -2,11 +2,11 @@ from pyrogram import Client, filters, ContinuePropagation
|
||||
from pyrogram.enums import MessageEntityType
|
||||
from pyrogram.types import Message
|
||||
|
||||
from defs.lofter import get_loft, input_media
|
||||
from defs.lofter import get_loft, input_media, get_loft_user, lofter_user_link
|
||||
|
||||
|
||||
@Client.on_message(filters.incoming & filters.text &
|
||||
filters.regex(r"lofter.com/post/"))
|
||||
filters.regex(r"lofter.com"))
|
||||
async def lofter_share(_: Client, message: Message):
|
||||
if not message.text:
|
||||
return
|
||||
@ -20,13 +20,22 @@ async def lofter_share(_: Client, message: Message):
|
||||
url = entity.url
|
||||
else:
|
||||
continue
|
||||
img = await get_loft(url)
|
||||
if not img:
|
||||
continue
|
||||
if len(img) == 1:
|
||||
await img[0].reply_to(message, static=static)
|
||||
if "/post/" in url:
|
||||
img = await get_loft(url)
|
||||
if not img:
|
||||
continue
|
||||
if len(img) == 1:
|
||||
await img[0].reply_to(message, static=static)
|
||||
else:
|
||||
await message.reply_media_group(media=await input_media(img[:9], static), quote=True)
|
||||
else:
|
||||
await message.reply_media_group(media=await input_media(img[:9], static), quote=True)
|
||||
text, avatar, username, status_link = await get_loft_user(url)
|
||||
await message.reply_photo(
|
||||
avatar,
|
||||
caption=text,
|
||||
quote=True,
|
||||
reply_markup=lofter_user_link(username, status_link)
|
||||
)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
breakpoint()
|
||||
|
@ -4,7 +4,10 @@ from defs.button import gen_button, Button
|
||||
|
||||
des = """本机器人特性:
|
||||
|
||||
★ 解析 bilibili 视频
|
||||
★ 解析 bilibili 视频、动态
|
||||
★ 解析 twitter 推文、用户
|
||||
★ 解析 lofter 日志、用户
|
||||
★ 汇率查询
|
||||
★ 复读机(3条)
|
||||
★ 答案之书
|
||||
★ 鲁迅说过
|
||||
|
Loading…
Reference in New Issue
Block a user