切换到 bilibili-api 库

This commit is contained in:
xtaodada 2023-05-27 21:45:56 +08:00
parent 8d0720e919
commit 7950b18020
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
17 changed files with 102 additions and 703 deletions

View File

@ -13,12 +13,6 @@ port = 1080
[basic]
ipv6 = False
[twitter]
consumer_key = ABCD
consumer_secret = ABCD
access_token_key = ABCD
access_token_secret = ABCD
[post]
admin = 0
lofter_channel = 0
@ -26,3 +20,4 @@ lofter_channel_username = username
[api]
amap_key = ABCD
bili_cookie = ABCD

View File

@ -1,115 +0,0 @@
import time
from io import BytesIO
from PIL import Image
import httpx
import jinja2
import random
from os import sep
from init import logger
from defs.browser import html_to_pic
from defs.diff import diff_text
env = jinja2.Environment(enable_async=True)
with open(f"resources{sep}templates{sep}article.html", "r", encoding="utf-8") as f:
article_data = f.read()
article_tpl = env.from_string(article_data)
async def check_text(text: str):
try:
url = "https://asoulcnki.asia/v1/api/check"
async with httpx.AsyncClient() as client:
resp = await client.post(url=url, json={"text": text})
result = resp.json()
if result["code"] != 0:
return None, None
data = result["data"]
if not data["related"]:
return None, "没有找到重复的小作文捏"
rate = data["rate"]
related = data["related"][0]
reply_url = str(related["reply_url"]).strip()
reply = related["reply"]
msg = [
"枝网文本复制检测报告",
"",
"总复制比 {:.2f}%".format(rate * 100),
f'相似小作文: <a href="{reply_url}">地点</a> - '
f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(reply["ctime"]))}',
]
image = await render_reply(reply, diff=text)
if not image:
return None, "\n".join(msg)
return image, "\n".join(msg)
except Exception as e:
logger.warning(f"Error in check_text: {e}")
return None, None
async def random_text(keyword: str = ""):
try:
url = "https://asoulcnki.asia/v1/api/ranking"
params = {"pageSize": 10, "pageNum": 1, "timeRangeMode": 0, "sortMode": 0}
if keyword:
params["keywords"] = keyword
else:
params["pageNum"] = random.randint(1, 100)
async with httpx.AsyncClient() as client:
resp = await client.get(url=url, params=params)
result = resp.json()
if result["code"] != 0:
return None, None
replies = result["data"]["replies"]
if not replies:
return None, "没有找到小作文捏"
reply = random.choice(replies)
image = await render_reply(reply)
reply_url = (
f"https://t.bilibili.com/{reply['dynamic_id']}/#reply{reply['rpid']}"
)
if not image:
return None, f'<a href="{reply_url}">转到小作文</a>'
return image, f'<a href="{reply_url}">转到小作文</a>'
except Exception as e:
logger.warning(f"Error in random_text: {e}")
return None, None
async def render_reply(reply: dict, diff: str = ""):
try:
article = {}
article["username"] = reply["m_name"]
article["like"] = reply["like_num"]
article["all_like"] = reply["similar_like_sum"]
article["quote"] = reply["similar_count"]
article["text"] = (
diff_text(diff, reply["content"]) if diff else reply["content"]
)
article["time"] = time.strftime("%Y-%m-%d", time.localtime(reply["ctime"]))
html = await article_tpl.render_async(article=article)
img_raw = await html_to_pic(
html, wait=0, viewport={"width": 500, "height": 100}
)
# 将bytes结果转化为字节流
bytes_stream = BytesIO(img_raw)
# 读取到图片
img = Image.open(bytes_stream)
imgByteArr = BytesIO() # 初始化一个空字节流
img.save(imgByteArr, format("PNG")) # 把我们得图片以 PNG 保存到空字节流
imgByteArr = imgByteArr.getvalue() # 无视指针获取全部内容类型由io流变成bytes。
with open(f"data{sep}asoulcnki.png", "wb") as i:
i.write(imgByteArr)
return f"data{sep}asoulcnki.png"
except Exception as e:
logger.warning(f"Error in render_reply: {e}")
return None

View File

@ -1,19 +1,44 @@
import re
from os import sep
from typing import Optional
import qrcode
import string
from bilibili_api import Credential
from bilibili_api.video import Video
from bilibili_api.user import User
from pyrogram import ContinuePropagation
from qrcode.image.pil import PilImage
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
from defs.cookie import get_bili_cookie, get_bili_browser_cookie
from defs.browser import get_browser
from headers import bili_headers
from init import request
def from_cookie_get_credential() -> Optional[Credential]:
"""
cookie 中获取 Credential 对象
Returns:
Credential: Credential 对象
"""
cookie = get_bili_cookie()
try:
sessdata = cookie["SESSDATA"]
bili_jct = cookie["bili_jct"]
buvid3 = cookie["buvid3"]
dedeuserid = cookie["DedeUserID"]
except KeyError:
return None
return Credential(sessdata, bili_jct, buvid3, dedeuserid)
credential = from_cookie_get_credential()
def cut_text(old_str, cut):
"""
:说明: `get_cut_str`
@ -39,7 +64,7 @@ def cut_text(old_str, cut):
next_str = next_str[1:]
elif s == "\n":
str_list.append(next_str[: i - 1])
next_str = next_str[i - 1 :]
next_str = next_str[i - 1:]
si = 0
i = 0
continue
@ -88,17 +113,12 @@ async def b23_extract(text):
async def video_info_get(cid):
if cid[:2] == "av":
video_info = await request.get(
f"https://api.bilibili.com/x/web-interface/view?aid={cid[2:]}"
)
video_info = video_info.json()
v = Video(aid=cid[2:], credential=credential)
elif cid[:2] == "BV":
video_info = await request.get(
f"https://api.bilibili.com/x/web-interface/view?bvid={cid}"
)
video_info = video_info.json()
v = Video(bvid=cid, credential=credential)
else:
return
video_info = await v.get_info()
return video_info
@ -115,7 +135,7 @@ def numf(num: int):
async def binfo_image_create(video_info: dict):
bg_y = 0
# 封面
pic_url = video_info["data"]["pic"]
pic_url = video_info["pic"]
pic_get = (await request.get(pic_url)).content
pic_bio = BytesIO(pic_get)
pic = Image.open(pic_bio)
@ -125,7 +145,7 @@ async def binfo_image_create(video_info: dict):
bg_y += 350 + 20
# 时长
minutes, seconds = divmod(video_info["data"]["duration"], 60)
minutes, seconds = divmod(video_info["duration"], 60)
hours, minutes = divmod(minutes, 60)
video_time = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
tiem_font = ImageFont.truetype(
@ -135,12 +155,12 @@ async def binfo_image_create(video_info: dict):
draw.text((10, 305), video_time, "white", tiem_font)
# 分区
tname = video_info["data"]["tname"]
tname = video_info["tname"]
tname_x, _ = tiem_font.getsize(tname)
draw.text((560 - tname_x - 10, 305), tname, "white", tiem_font)
# 标题
title = video_info["data"]["title"]
title = video_info["title"]
title_font = ImageFont.truetype(
f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 25
)
@ -154,7 +174,7 @@ async def binfo_image_create(video_info: dict):
# 简介
dynamic = (
"该视频没有简介" if video_info["data"]["desc"] == "" else video_info["data"]["desc"]
"该视频没有简介" if video_info["desc"] == "" else video_info["desc"]
)
dynamic_font = ImageFont.truetype(
f"resources{sep}font{sep}sarasa-mono-sc-semibold.ttf", 18
@ -175,11 +195,11 @@ async def binfo_image_create(video_info: dict):
f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 26
)
view = numf(video_info["data"]["stat"]["view"]) # 播放 \uE6E6
danmaku = numf(video_info["data"]["stat"]["danmaku"]) # 弹幕 \uE6E7
favorite = numf(video_info["data"]["stat"]["favorite"]) # 收藏 \uE6E1
coin = numf(video_info["data"]["stat"]["coin"]) # 投币 \uE6E4
like = numf(video_info["data"]["stat"]["like"]) # 点赞 \uE6E0
view = numf(video_info["stat"]["view"]) # 播放 \uE6E6
danmaku = numf(video_info["stat"]["danmaku"]) # 弹幕 \uE6E7
favorite = numf(video_info["stat"]["favorite"]) # 收藏 \uE6E1
coin = numf(video_info["stat"]["coin"]) # 投币 \uE6E4
like = numf(video_info["stat"]["like"]) # 点赞 \uE6E0
info_bg = Image.new("RGB", (560, 170), "#F5F5F7")
draw = ImageDraw.Draw(info_bg)
@ -199,46 +219,39 @@ async def binfo_image_create(video_info: dict):
# UP主
# 等级 0-4 \uE6CB-F 5-6\uE6D0-1
# UP \uE723
if "staff" in video_info["data"]:
if "staff" in video_info:
up_list = []
for up in video_info["data"]["staff"]:
for up in video_info["staff"]:
up_mid = up["mid"]
up_data = (await request.get(
f"https://api.bilibili.com/x/space/acc/info?mid={up_mid}",
headers=bili_headers,
)).json()
u = User(up_mid, credential=credential)
up_data = await u.get_user_info()
up_list.append(
{
"name": up["name"],
"up_title": up["title"],
"face": up["face"],
"color": up_data["data"]["vip"]["nickname_color"]
if up_data["data"]["vip"]["nickname_color"] != ""
"color": up_data["vip"]["nickname_color"]
if up_data["vip"]["nickname_color"] != ""
else "black",
"follower": up["follower"],
"level": up_data["data"]["level"],
"level": up_data["level"],
}
)
else:
up_mid = video_info["data"]["owner"]["mid"]
up_data = (await request.get(
f"https://api.bilibili.com/x/space/wbi/acc/info?mid={up_mid}",
headers=bili_headers,
)).json()
up_stat = (await request.get(
f"https://api.bilibili.com/x/relation/stat?vmid={up_mid}",
headers=bili_headers,
)).json()
up_mid = video_info["owner"]["mid"]
u = User(up_mid, credential=credential)
up_data = await u.get_user_info()
up_stat = await u.get_relation_info()
up_list = [
{
"name": up_data["data"]["name"],
"name": up_data["name"],
"up_title": "UP主",
"face": up_data["data"]["face"],
"color": up_data["data"]["vip"]["nickname_color"]
if up_data["data"]["vip"]["nickname_color"] != ""
"face": up_data["face"],
"color": up_data["vip"]["nickname_color"]
if up_data["vip"]["nickname_color"] != ""
else "black",
"follower": up_stat["data"]["follower"],
"level": up_data["data"]["level"],
"follower": up_stat["follower"],
"level": up_data["level"],
}
]
up_num = len(up_list)
@ -330,7 +343,7 @@ async def binfo_image_create(video_info: dict):
draw = ImageDraw.Draw(baner_bg)
# 二维码
qr = qrcode.QRCode(border=1)
qr.add_data("https://b23.tv/" + video_info["data"]["bvid"])
qr.add_data("https://b23.tv/" + video_info["bvid"])
qr_image = qr.make_image(PilImage, fill_color=icon_color, back_color="#F5F5F7")
qr_image = qr_image.resize((140, 140))
baner_bg.paste(qr_image, (50, 10))
@ -365,16 +378,7 @@ async def get_dynamic_screenshot_pc(dynamic_id):
viewport={"width": 2560, "height": 1080},
device_scale_factor=2,
)
await context.add_cookies(
[
{
"name": "hit-dyn-v2",
"value": "1",
"domain": ".bilibili.com",
"path": "/",
}
]
)
await context.add_cookies(get_bili_browser_cookie())
page = await context.new_page()
try:
await page.goto(url, wait_until="networkidle", timeout=10000)

27
defs/cookie.py Normal file
View File

@ -0,0 +1,27 @@
from typing import Dict, List
from defs.glover import bili_cookie
def get_bili_cookie() -> Dict[str, str]:
data = {}
for i in bili_cookie.split(";"):
if i:
k, v = i.split("=")
data[k] = v
return data
def get_bili_browser_cookie() -> List[Dict[str, str]]:
cookie = get_bili_cookie()
data = []
for k, v in cookie.items():
data.append(
{
"name": k,
"value": v,
"domain": ".bilibili.com",
"path": "/",
}
)
return data

View File

@ -7,36 +7,26 @@ api_id: int = 0
api_hash: str = ""
# [Basic]
ipv6: Union[bool, str] = "False"
# [twitter]
consumer_key: str = ""
consumer_secret: str = ""
access_token_key: str = ""
access_token_secret: str = ""
# [post]
admin: int = 0
lofter_channel: int = 0
lofter_channel_username: str = ""
# [api]
amap_key: str = ""
bili_cookie: str = ""
config = RawConfigParser()
config.read("config.ini")
api_id = config.getint("pyrogram", "api_id", fallback=api_id)
api_hash = config.get("pyrogram", "api_hash", fallback=api_hash)
ipv6 = config.get("basic", "ipv6", fallback=ipv6)
consumer_key = config.get("twitter", "consumer_key", fallback=consumer_key)
consumer_secret = config.get("twitter", "consumer_secret", fallback=consumer_secret)
access_token_key = config.get("twitter", "access_token_key", fallback=access_token_key)
access_token_secret = config.get(
"twitter", "access_token_secret", fallback=access_token_secret
)
admin = config.getint("post", "admin", fallback=admin)
lofter_channel = config.getint("post", "lofter_channel", fallback=lofter_channel)
lofter_channel_username = config.get(
"post", "lofter_channel_username", fallback=lofter_channel_username
)
amap_key = config.get("api", "amap_key", fallback=amap_key)
bili_cookie = config.get("api", "bili_cookie", fallback=bili_cookie)
try:
ipv6 = strtobool(ipv6)
ipv6 = bool(strtobool(ipv6))
except ValueError:
ipv6 = False

View File

@ -1,28 +0,0 @@
from defs.browser import get_browser
async def get_mihoyo_screenshot(url):
browser = await get_browser()
context = await browser.new_context(
viewport={"width": 2560, "height": 1080},
device_scale_factor=2,
)
page = await context.new_page()
try:
await page.goto(url, wait_until="networkidle", timeout=180000)
# 被删除或者进审核了
if page.url == "https://www.miyoushe.com/ys/404":
return None
card = await page.wait_for_selector(
".mhy-article-page__main", timeout=180000, state="visible"
)
assert card
clip = await card.bounding_box()
assert clip
clip["width"] += 310
return await page.screenshot(clip=clip, full_page=True)
except Exception as e:
print(f"截取米哈游帖子时发生错误:{e}")
return await page.screenshot(full_page=True)
finally:
await context.close()

View File

@ -1,254 +0,0 @@
import contextlib
from datetime import datetime, timedelta
from defs.glover import (
consumer_key,
consumer_secret,
access_token_key,
access_token_secret,
)
import twitter
from pyrogram.enums import ParseMode
from pyrogram.types import (
InlineKeyboardMarkup,
InlineKeyboardButton,
InputMediaPhoto,
InputMediaVideo,
InputMediaDocument,
InputMediaAnimation,
)
twitter_api = twitter.Api(
consumer_key=consumer_key,
consumer_secret=consumer_secret,
access_token_key=access_token_key,
access_token_secret=access_token_secret,
tweet_mode="extended",
timeout=30,
)
def twitter_link(status_id, qid, uid):
if qid:
return InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Source",
url=f"https://twitter.com/{uid}/status/{status_id}",
),
InlineKeyboardButton(
text="RSource", url=f"https://twitter.com/{qid}"
),
InlineKeyboardButton(
text="Author", url=f"https://twitter.com/{uid}"
),
]
]
)
else:
return InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Source",
url=f"https://twitter.com/{uid}/status/{status_id}",
),
InlineKeyboardButton(
text="Author", url=f"https://twitter.com/{uid}"
),
]
]
)
def twitter_user_link(user_username, status_link):
return (
InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Author", url=f"https://twitter.com/{user_username}"
),
InlineKeyboardButton(text="Status", url=status_link),
]
]
)
if status_link
else InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Author", url=f"https://twitter.com/{user_username}"
)
]
]
)
)
def twitter_media(text, media_model, media_list, static: bool = False):
media_lists = []
for ff in range(len(media_model)):
if static:
media_lists.append(
InputMediaDocument(
media_list[ff],
caption=text if ff == 0 else None,
parse_mode=ParseMode.HTML,
)
)
elif media_model[ff] == "photo":
media_lists.append(
InputMediaPhoto(
media_list[ff],
caption=text if ff == 0 else None,
parse_mode=ParseMode.HTML,
)
)
elif media_model[ff] == "gif":
media_lists.append(
InputMediaAnimation(
media_list[ff],
caption=text if ff == 0 else None,
parse_mode=ParseMode.HTML,
)
)
else:
media_lists.append(
InputMediaVideo(
media_list[ff],
caption=text if ff == 0 else None,
parse_mode=ParseMode.HTML,
)
)
return media_lists
def get_twitter_time(date: str) -> str:
try:
date = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") + timedelta(
hours=8
)
return date.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return date
def get_twitter_user(url_json):
user_name = url_json.name
user_username = url_json.screen_name
status = ""
status_link = None
verified = "💎" if url_json.verified else ""
protected = "🔒" if url_json.protected else ""
if url_json.status:
status_link = f"https://twitter.com/{user_username}/{url_json.status.id_str}"
status = f'🆕 New Status: <a href="{status_link}">{get_twitter_time(url_json.status.created_at)}</a>\n'
text = (
f"<b>Twitter User Info</b>\n\n"
f"Name: {verified}{protected}<code>{user_name}</code>\n"
f'Username: <a href="https://twitter.com/{user_username}">@{user_username}</a>\n'
f"Bio: <code>{url_json.description}</code>\n"
f"Joined: <code>{get_twitter_time(url_json.created_at)}</code>\n"
f"{status}"
f"📤 {url_json.statuses_count} ❤️{url_json.favourites_count} "
f"粉丝 {url_json.followers_count} 关注 {url_json.friends_count}"
)
return text, user_username, status_link
def get_twitter_status(url_json):
created_at = get_twitter_time(url_json.created_at)
favorite_count = (
url_json.favorite_count if hasattr(url_json, "favorite_count") else 0
)
retweet_count = url_json.retweet_count if hasattr(url_json, "retweet_count") else 0
user_name = url_json.user.name
user_username = url_json.user.screen_name
text = url_json.full_text if hasattr(url_json, "full_text") else "暂 无 内 容"
text = f"<code>{text}</code>"
verified = ""
protected = ""
if url_json.user.verified:
verified = "💎"
if url_json.user.protected:
protected = "🔒"
user_text = (
f'{verified}{protected}<a href="https://twitter.com/{user_username}">{user_name}</a> 发表于 {created_at}'
f"\n👍 {favorite_count} 🔁 {retweet_count}"
)
media_model = []
media_list = []
media_alt_list = []
with contextlib.suppress(Exception):
media_info = url_json.media
for i in media_info:
media_url = i.url if hasattr(i, "url") else None
if media_url:
text = text.replace(media_url, "")
if i.type == "photo":
media_model.append("photo")
media_list.append(i.media_url_https)
elif i.type == "animated_gif":
media_model.append("gif")
media_list.append(i.video_info["variants"][0]["url"])
else:
media_model.append("video")
for f in i.video_info["variants"]:
if f["content_type"] == "video/mp4":
media_list.append(f["url"])
break
try:
media_alt_list.append(i.ext_alt_text)
except:
media_alt_list.append("")
quoted_status = False
with contextlib.suppress(Exception):
quoted = url_json.quoted_status
quoted_status = (
quoted.user.screen_name + "/status/" + url_json.quoted_status_id_str
)
quoted_created_at = get_twitter_time(quoted.created_at)
quoted_favorite_count = (
quoted.favorite_count if hasattr(quoted, "favorite_count") else 0
)
quoted_retweet_count = (
quoted.retweet_count if hasattr(quoted, "retweet_count") else 0
)
quoted_user_name = quoted.user.name
quoted_user_username = quoted.user.screen_name
quoted_text = quoted.full_text if hasattr(quoted, "full_text") else "暂 无 内 容"
text += f"\n\n> <code>{quoted_text}</code>"
quoted_verified = ""
quoted_protected = ""
if quoted.user.verified:
quoted_verified = "💎"
if quoted.user.protected:
quoted_protected = "🔒"
user_text += (
f'\n> {quoted_verified}{quoted_protected}<a href="https://twitter.com/{quoted_user_username}">'
f"{quoted_user_name}</a> 发表于 {quoted_created_at}"
f"\n👍 {quoted_favorite_count} 🔁 {quoted_retweet_count}"
)
with contextlib.suppress(Exception):
quoted_media_info = quoted.media
for i in quoted_media_info:
media_url = i.url if hasattr(i, "url") else None
if media_url:
text = text.replace(media_url, "")
if i.type == "photo":
media_model.append("photo")
media_list.append(i.media_url_https)
elif i.type == "animated_gif":
media_model.append("gif")
media_list.append(i.video_info["variants"][0]["url"])
else:
media_model.append("video")
media_list.append(i.video_info["variants"][0]["url"])
try:
media_alt_list.append(i.ext_alt_text)
except:
media_alt_list.append("")
return text, user_text, media_model, media_list, quoted_status

View File

@ -1,6 +1,3 @@
bili_headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36"
}
FANBOX_HEADERS = {
"authority": "api.fanbox.cc",
"accept": "application/json, text/plain, */*",

View File

@ -29,13 +29,13 @@ async def bili_resolve(_: Client, message: Message):
if video_number:
video_number = video_number[0]
video_info = await video_info_get(video_number) if video_number else None
if video_info and video_info["code"] == 0:
if video_info:
image = await binfo_image_create(video_info)
await message.reply_photo(
image,
quote=True,
reply_markup=gen_button(
[Button(0, "Link", "https://b23.tv/" + video_info["data"]["bvid"])]
[Button(0, "Link", "https://b23.tv/" + video_info["bvid"])]
),
)
raise ContinuePropagation

View File

@ -42,7 +42,7 @@ async def dc_command(_: Client, message: Message):
}
dc, mention = get_dc(message)
if dc:
text = f"{mention}所在数据中心为: <b>DC{dc}</b>\n" f"该数据中心位于 <b>{geo_dic[str(dc)]}</b>"
text = f"{mention}所在数据中心为: <b>DC{dc}</b>\n该数据中心位于 <b>{geo_dic[str(dc)]}</b>"
else:
text = f"{mention}需要先<b>设置头像并且对我可见。</b>"
await message.reply(text)

View File

@ -78,7 +78,7 @@ async def fragment_inline(_, inline_query: InlineQuery):
user = await parse_fragment(username)
text = user.text
except NotAvailable:
text = f"用户名:@{username}\n" f"状态:暂未开放购买\n"
text = f"用户名:@{username}\n状态:暂未开放购买\n"
except Exception:
text = ""
if not text:

View File

@ -52,4 +52,4 @@ async def geo_command(_: Client, message: Message):
msg = await message.reply_location(
longitude=float(lat), latitude=float(lon), quote=True
)
await msg.reply(f"坐标:`{lat},{lon}`\n" f"地址:<b>{formatted_address}</b>", quote=True)
await msg.reply(f"坐标:`{lat},{lon}`\n地址:<b>{formatted_address}</b>", quote=True)

View File

@ -34,9 +34,7 @@ async def ip_command(_: Client, message: Message):
"org,as,mobile,proxy,hosting,query"
)
).json()
if ipinfo_json["status"] == "fail":
pass
elif ipinfo_json["status"] == "success":
if ipinfo_json["status"] == "success":
rep_text = ip_info(url, ipinfo_json)
text = ""
if message.entities:
@ -60,13 +58,11 @@ async def ip_command(_: Client, message: Message):
"org,as,mobile,proxy,hosting,query"
)
).json()
if ipinfo_json["status"] == "fail":
pass
elif ipinfo_json["status"] == "success":
if ipinfo_json["status"] == "success":
text = ip_info(url, ipinfo_json)
if text == "":
url = message.text[4:]
if not url == "":
if url != "":
ipinfo_json = (
await request.get(
"http://ip-api.com/json/"
@ -76,13 +72,11 @@ async def ip_command(_: Client, message: Message):
"org,as,mobile,proxy,hosting,query"
)
).json()
if ipinfo_json["status"] == "fail":
pass
elif ipinfo_json["status"] == "success":
if ipinfo_json["status"] == "success":
text = ip_info(url, ipinfo_json)
if rep_text == "" and text == "":
await msg.edit("没有找到要查询的 ip/域名 ...")
elif not rep_text == "" and not text == "":
elif rep_text != "" and text != "":
await msg.edit(f"{rep_text}\n================\n{text}")
else:
await msg.edit(f"{rep_text}{text}")

View File

@ -1,48 +0,0 @@
import re
from io import BytesIO
from pyrogram import Client, filters, ContinuePropagation
from pyrogram.types import Message
from PIL import Image
from defs.mihoyo_bbs import get_mihoyo_screenshot
from defs.button import gen_button, Button
@Client.on_message(
filters.incoming
& filters.text
& filters.regex(r"(https://)?(m\.)?www.miyoushe.com/.+/article/\d+")
)
async def bili_dynamic(_: Client, message: Message):
# sourcery skip: use-named-expression
try:
p = re.compile(r"(https://)?(m\.)?www.miyoushe.com/.+/article/\d+")
article = p.search(message.text)
if article:
article_url = article.group()
if not article_url.startswith(("https://", "http://")):
article_url = f"https://{article_url}"
image = await get_mihoyo_screenshot(article_url)
if image:
# 将bytes结果转化为字节流
photo = BytesIO(image)
photo.name = "screenshot.png"
pillow_photo = Image.open(BytesIO(image))
width, height = pillow_photo.size
if abs(height - width) > 1300:
await message.reply_document(
document=photo,
quote=True,
reply_markup=gen_button([Button(0, "Link", article_url)]),
)
else:
await message.reply_photo(
photo,
quote=True,
reply_markup=gen_button([Button(0, "Link", article_url)]),
)
except Exception as e:
print(f"截取米哈游帖子时发生错误:{e}")
raise ContinuePropagation

View File

@ -5,7 +5,6 @@ from defs.button import gen_button, Button
des = """本机器人特性:
解析 bilibili 视频动态
解析 twitter 推文用户
解析 lofter 日志用户
解析 fanbox 发帖用户
汇率查询

View File

@ -1,160 +0,0 @@
import asyncio
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor
from pyrogram import Client, filters, ContinuePropagation
from pyrogram.enums import MessageEntityType, ParseMode
from pyrogram.types import Message
from defs.twitter_api import (
twitter_api,
get_twitter_status,
twitter_link,
twitter_media,
twitter_user_link,
get_twitter_user,
)
@Client.on_message(filters.incoming & filters.text & filters.regex(r"twitter.com/"))
async def twitter_share(client: Client, message: Message):
if not message.text:
return
static = "static" in message.text
try:
for num in range(len(message.entities)):
entity = message.entities[num]
if entity.type == MessageEntityType.URL:
url = message.text[entity.offset : entity.offset + entity.length]
elif entity.type == MessageEntityType.TEXT_LINK:
url = entity.url
else:
continue
url = urlparse(url)
if url.hostname and url.hostname in ["twitter.com", "vxtwitter.com"]:
if url.path.find("status") >= 0:
status_id = str(
url.path[url.path.find("status") + 7 :].split("/")[0]
).split("?")[0]
url_json = None
with ThreadPoolExecutor() as executor:
for _ in range(3):
try:
future = client.loop.run_in_executor(
executor, twitter_api.GetStatus, status_id
)
url_json = await asyncio.wait_for(
future, timeout=30, loop=client.loop
)
except Exception as e:
print(e)
return
if url_json:
break
if not url_json:
return
(
text,
user_text,
media_model,
media_list,
quoted_status,
) = get_twitter_status(url_json)
text = f"<b>Twitter Status Info</b>\n\n{text}\n\n{user_text}"
if len(media_model) == 0:
await client.send_message(
message.chat.id,
text,
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
reply_to_message_id=message.id,
reply_markup=twitter_link(
url_json.id, quoted_status, url_json.user.screen_name
),
)
elif len(media_model) == 1:
if static:
await message.reply_document(
media_list[0],
caption=text,
quote=True,
parse_mode=ParseMode.HTML,
reply_markup=twitter_link(
url_json.id,
quoted_status,
url_json.user.screen_name,
),
)
elif media_model[0] == "photo":
await message.reply_photo(
media_list[0],
caption=text,
parse_mode=ParseMode.HTML,
quote=True,
reply_markup=twitter_link(
url_json.id,
quoted_status,
url_json.user.screen_name,
),
)
elif media_model[0] == "gif":
await message.reply_animation(
media_list[0],
caption=text,
parse_mode=ParseMode.HTML,
quote=True,
reply_markup=twitter_link(
url_json.id,
quoted_status,
url_json.user.screen_name,
),
)
else:
await message.reply_video(
media_list[0],
caption=text,
parse_mode=ParseMode.HTML,
quote=True,
reply_markup=twitter_link(
url_json.id,
quoted_status,
url_json.user.screen_name,
),
)
else:
await client.send_media_group(
message.chat.id,
media=twitter_media(text, media_model, media_list, static),
)
elif url.path == "/":
return
else:
# 解析用户
uid = url.path.replace("/", "")
url_json = None
with ThreadPoolExecutor() as executor:
for _ in range(3):
try:
future = client.loop.run_in_executor(
executor, twitter_api.GetUser, None, uid
)
url_json = await asyncio.wait_for(
future, timeout=30, loop=client.loop
)
except Exception as e:
print(e)
return
if url_json:
break
if not url_json:
return
text, user_username, status_link = get_twitter_user(url_json)
await message.reply_photo(
url_json.profile_image_url_https.replace("_normal", ""),
caption=text,
quote=True,
reply_markup=twitter_user_link(user_username, status_link),
)
except Exception as e:
print(e)
raise ContinuePropagation

View File

@ -1,18 +1,16 @@
pyrogram==2.0.96
pyrogram==2.0.106
tgcrypto==1.2.5
bilibili-api-python==15.4.4
httpx
pillow
cashews
coloredlogs
qrcode
pyncm
mutagen
playwright
uvicorn
jinja2
apscheduler
pytz
python-twitter
beautifulsoup4
lxml
sqlalchemy