diff --git a/config.gen.ini b/config.gen.ini
index 30b6640..703126f 100644
--- a/config.gen.ini
+++ b/config.gen.ini
@@ -13,12 +13,6 @@ port = 1080
[basic]
ipv6 = False
-[twitter]
-consumer_key = ABCD
-consumer_secret = ABCD
-access_token_key = ABCD
-access_token_secret = ABCD
-
[post]
admin = 0
lofter_channel = 0
@@ -26,3 +20,4 @@ lofter_channel_username = username
[api]
amap_key = ABCD
+bili_cookie = ABCD
diff --git a/defs/asoulcnki.py b/defs/asoulcnki.py
deleted file mode 100644
index 950ce1c..0000000
--- a/defs/asoulcnki.py
+++ /dev/null
@@ -1,115 +0,0 @@
-import time
-from io import BytesIO
-from PIL import Image
-import httpx
-import jinja2
-import random
-from os import sep
-from init import logger
-from defs.browser import html_to_pic
-from defs.diff import diff_text
-
-env = jinja2.Environment(enable_async=True)
-with open(f"resources{sep}templates{sep}article.html", "r", encoding="utf-8") as f:
- article_data = f.read()
-article_tpl = env.from_string(article_data)
-
-
-async def check_text(text: str):
- try:
- url = "https://asoulcnki.asia/v1/api/check"
- async with httpx.AsyncClient() as client:
- resp = await client.post(url=url, json={"text": text})
- result = resp.json()
-
- if result["code"] != 0:
- return None, None
-
- data = result["data"]
- if not data["related"]:
- return None, "没有找到重复的小作文捏"
-
- rate = data["rate"]
- related = data["related"][0]
- reply_url = str(related["reply_url"]).strip()
- reply = related["reply"]
-
- msg = [
- "枝网文本复制检测报告",
- "",
- "总复制比 {:.2f}%".format(rate * 100),
- f'相似小作文: 地点 - '
- f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(reply["ctime"]))}',
- ]
-
- image = await render_reply(reply, diff=text)
- if not image:
- return None, "\n".join(msg)
- return image, "\n".join(msg)
- except Exception as e:
- logger.warning(f"Error in check_text: {e}")
- return None, None
-
-
-async def random_text(keyword: str = ""):
- try:
- url = "https://asoulcnki.asia/v1/api/ranking"
- params = {"pageSize": 10, "pageNum": 1, "timeRangeMode": 0, "sortMode": 0}
- if keyword:
- params["keywords"] = keyword
- else:
- params["pageNum"] = random.randint(1, 100)
-
- async with httpx.AsyncClient() as client:
- resp = await client.get(url=url, params=params)
- result = resp.json()
-
- if result["code"] != 0:
- return None, None
-
- replies = result["data"]["replies"]
- if not replies:
- return None, "没有找到小作文捏"
-
- reply = random.choice(replies)
- image = await render_reply(reply)
- reply_url = (
- f"https://t.bilibili.com/{reply['dynamic_id']}/#reply{reply['rpid']}"
- )
- if not image:
- return None, f'转到小作文'
- return image, f'转到小作文'
- except Exception as e:
- logger.warning(f"Error in random_text: {e}")
- return None, None
-
-
-async def render_reply(reply: dict, diff: str = ""):
- try:
- article = {}
- article["username"] = reply["m_name"]
- article["like"] = reply["like_num"]
- article["all_like"] = reply["similar_like_sum"]
- article["quote"] = reply["similar_count"]
- article["text"] = (
- diff_text(diff, reply["content"]) if diff else reply["content"]
- )
- article["time"] = time.strftime("%Y-%m-%d", time.localtime(reply["ctime"]))
-
- html = await article_tpl.render_async(article=article)
- img_raw = await html_to_pic(
- html, wait=0, viewport={"width": 500, "height": 100}
- )
- # 将bytes结果转化为字节流
- bytes_stream = BytesIO(img_raw)
- # 读取到图片
- img = Image.open(bytes_stream)
- imgByteArr = BytesIO() # 初始化一个空字节流
- img.save(imgByteArr, format("PNG")) # 把我们得图片以 PNG 保存到空字节流
- imgByteArr = imgByteArr.getvalue() # 无视指针,获取全部内容,类型由io流变成bytes。
- with open(f"data{sep}asoulcnki.png", "wb") as i:
- i.write(imgByteArr)
- return f"data{sep}asoulcnki.png"
- except Exception as e:
- logger.warning(f"Error in render_reply: {e}")
- return None
diff --git a/defs/bilibili.py b/defs/bilibili.py
index 7120a5b..509776e 100644
--- a/defs/bilibili.py
+++ b/defs/bilibili.py
@@ -1,19 +1,44 @@
import re
from os import sep
+from typing import Optional
import qrcode
import string
+from bilibili_api import Credential
+from bilibili_api.video import Video
+from bilibili_api.user import User
from pyrogram import ContinuePropagation
from qrcode.image.pil import PilImage
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
+from defs.cookie import get_bili_cookie, get_bili_browser_cookie
from defs.browser import get_browser
-from headers import bili_headers
from init import request
+def from_cookie_get_credential() -> Optional[Credential]:
+ """
+ 从 cookie 中获取 Credential 对象。
+
+ Returns:
+ Credential: Credential 对象。
+ """
+ cookie = get_bili_cookie()
+ try:
+ sessdata = cookie["SESSDATA"]
+ bili_jct = cookie["bili_jct"]
+ buvid3 = cookie["buvid3"]
+ dedeuserid = cookie["DedeUserID"]
+ except KeyError:
+ return None
+ return Credential(sessdata, bili_jct, buvid3, dedeuserid)
+
+
+credential = from_cookie_get_credential()
+
+
def cut_text(old_str, cut):
"""
:说明: `get_cut_str`
@@ -39,7 +64,7 @@ def cut_text(old_str, cut):
next_str = next_str[1:]
elif s == "\n":
str_list.append(next_str[: i - 1])
- next_str = next_str[i - 1 :]
+ next_str = next_str[i - 1:]
si = 0
i = 0
continue
@@ -88,17 +113,12 @@ async def b23_extract(text):
async def video_info_get(cid):
if cid[:2] == "av":
- video_info = await request.get(
- f"https://api.bilibili.com/x/web-interface/view?aid={cid[2:]}"
- )
- video_info = video_info.json()
+ v = Video(aid=cid[2:], credential=credential)
elif cid[:2] == "BV":
- video_info = await request.get(
- f"https://api.bilibili.com/x/web-interface/view?bvid={cid}"
- )
- video_info = video_info.json()
+ v = Video(bvid=cid, credential=credential)
else:
return
+ video_info = await v.get_info()
return video_info
@@ -115,7 +135,7 @@ def numf(num: int):
async def binfo_image_create(video_info: dict):
bg_y = 0
# 封面
- pic_url = video_info["data"]["pic"]
+ pic_url = video_info["pic"]
pic_get = (await request.get(pic_url)).content
pic_bio = BytesIO(pic_get)
pic = Image.open(pic_bio)
@@ -125,7 +145,7 @@ async def binfo_image_create(video_info: dict):
bg_y += 350 + 20
# 时长
- minutes, seconds = divmod(video_info["data"]["duration"], 60)
+ minutes, seconds = divmod(video_info["duration"], 60)
hours, minutes = divmod(minutes, 60)
video_time = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
tiem_font = ImageFont.truetype(
@@ -135,12 +155,12 @@ async def binfo_image_create(video_info: dict):
draw.text((10, 305), video_time, "white", tiem_font)
# 分区
- tname = video_info["data"]["tname"]
+ tname = video_info["tname"]
tname_x, _ = tiem_font.getsize(tname)
draw.text((560 - tname_x - 10, 305), tname, "white", tiem_font)
# 标题
- title = video_info["data"]["title"]
+ title = video_info["title"]
title_font = ImageFont.truetype(
f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 25
)
@@ -154,7 +174,7 @@ async def binfo_image_create(video_info: dict):
# 简介
dynamic = (
- "该视频没有简介" if video_info["data"]["desc"] == "" else video_info["data"]["desc"]
+ "该视频没有简介" if video_info["desc"] == "" else video_info["desc"]
)
dynamic_font = ImageFont.truetype(
f"resources{sep}font{sep}sarasa-mono-sc-semibold.ttf", 18
@@ -175,11 +195,11 @@ async def binfo_image_create(video_info: dict):
f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 26
)
- view = numf(video_info["data"]["stat"]["view"]) # 播放 \uE6E6
- danmaku = numf(video_info["data"]["stat"]["danmaku"]) # 弹幕 \uE6E7
- favorite = numf(video_info["data"]["stat"]["favorite"]) # 收藏 \uE6E1
- coin = numf(video_info["data"]["stat"]["coin"]) # 投币 \uE6E4
- like = numf(video_info["data"]["stat"]["like"]) # 点赞 \uE6E0
+ view = numf(video_info["stat"]["view"]) # 播放 \uE6E6
+ danmaku = numf(video_info["stat"]["danmaku"]) # 弹幕 \uE6E7
+ favorite = numf(video_info["stat"]["favorite"]) # 收藏 \uE6E1
+ coin = numf(video_info["stat"]["coin"]) # 投币 \uE6E4
+ like = numf(video_info["stat"]["like"]) # 点赞 \uE6E0
info_bg = Image.new("RGB", (560, 170), "#F5F5F7")
draw = ImageDraw.Draw(info_bg)
@@ -199,46 +219,39 @@ async def binfo_image_create(video_info: dict):
# UP主
# 等级 0-4 \uE6CB-F 5-6\uE6D0-1
# UP \uE723
- if "staff" in video_info["data"]:
+ if "staff" in video_info:
up_list = []
- for up in video_info["data"]["staff"]:
+ for up in video_info["staff"]:
up_mid = up["mid"]
- up_data = (await request.get(
- f"https://api.bilibili.com/x/space/acc/info?mid={up_mid}",
- headers=bili_headers,
- )).json()
+ u = User(up_mid, credential=credential)
+ up_data = await u.get_user_info()
up_list.append(
{
"name": up["name"],
"up_title": up["title"],
"face": up["face"],
- "color": up_data["data"]["vip"]["nickname_color"]
- if up_data["data"]["vip"]["nickname_color"] != ""
+ "color": up_data["vip"]["nickname_color"]
+ if up_data["vip"]["nickname_color"] != ""
else "black",
"follower": up["follower"],
- "level": up_data["data"]["level"],
+ "level": up_data["level"],
}
)
else:
- up_mid = video_info["data"]["owner"]["mid"]
- up_data = (await request.get(
- f"https://api.bilibili.com/x/space/wbi/acc/info?mid={up_mid}",
- headers=bili_headers,
- )).json()
- up_stat = (await request.get(
- f"https://api.bilibili.com/x/relation/stat?vmid={up_mid}",
- headers=bili_headers,
- )).json()
+ up_mid = video_info["owner"]["mid"]
+ u = User(up_mid, credential=credential)
+ up_data = await u.get_user_info()
+ up_stat = await u.get_relation_info()
up_list = [
{
- "name": up_data["data"]["name"],
+ "name": up_data["name"],
"up_title": "UP主",
- "face": up_data["data"]["face"],
- "color": up_data["data"]["vip"]["nickname_color"]
- if up_data["data"]["vip"]["nickname_color"] != ""
+ "face": up_data["face"],
+ "color": up_data["vip"]["nickname_color"]
+ if up_data["vip"]["nickname_color"] != ""
else "black",
- "follower": up_stat["data"]["follower"],
- "level": up_data["data"]["level"],
+ "follower": up_stat["follower"],
+ "level": up_data["level"],
}
]
up_num = len(up_list)
@@ -330,7 +343,7 @@ async def binfo_image_create(video_info: dict):
draw = ImageDraw.Draw(baner_bg)
# 二维码
qr = qrcode.QRCode(border=1)
- qr.add_data("https://b23.tv/" + video_info["data"]["bvid"])
+ qr.add_data("https://b23.tv/" + video_info["bvid"])
qr_image = qr.make_image(PilImage, fill_color=icon_color, back_color="#F5F5F7")
qr_image = qr_image.resize((140, 140))
baner_bg.paste(qr_image, (50, 10))
@@ -365,16 +378,7 @@ async def get_dynamic_screenshot_pc(dynamic_id):
viewport={"width": 2560, "height": 1080},
device_scale_factor=2,
)
- await context.add_cookies(
- [
- {
- "name": "hit-dyn-v2",
- "value": "1",
- "domain": ".bilibili.com",
- "path": "/",
- }
- ]
- )
+ await context.add_cookies(get_bili_browser_cookie())
page = await context.new_page()
try:
await page.goto(url, wait_until="networkidle", timeout=10000)
diff --git a/defs/cookie.py b/defs/cookie.py
new file mode 100644
index 0000000..d5bff5a
--- /dev/null
+++ b/defs/cookie.py
@@ -0,0 +1,27 @@
+from typing import Dict, List
+
+from defs.glover import bili_cookie
+
+
+def get_bili_cookie() -> Dict[str, str]:
+ data = {}
+ for i in bili_cookie.split(";"):
+ if i:
+ k, v = i.split("=")
+ data[k] = v
+ return data
+
+
+def get_bili_browser_cookie() -> List[Dict[str, str]]:
+ cookie = get_bili_cookie()
+ data = []
+ for k, v in cookie.items():
+ data.append(
+ {
+ "name": k,
+ "value": v,
+ "domain": ".bilibili.com",
+ "path": "/",
+ }
+ )
+ return data
diff --git a/defs/glover.py b/defs/glover.py
index e4bd162..377a45b 100644
--- a/defs/glover.py
+++ b/defs/glover.py
@@ -7,36 +7,26 @@ api_id: int = 0
api_hash: str = ""
# [Basic]
ipv6: Union[bool, str] = "False"
-# [twitter]
-consumer_key: str = ""
-consumer_secret: str = ""
-access_token_key: str = ""
-access_token_secret: str = ""
# [post]
admin: int = 0
lofter_channel: int = 0
lofter_channel_username: str = ""
# [api]
amap_key: str = ""
-
+bili_cookie: str = ""
config = RawConfigParser()
config.read("config.ini")
api_id = config.getint("pyrogram", "api_id", fallback=api_id)
api_hash = config.get("pyrogram", "api_hash", fallback=api_hash)
ipv6 = config.get("basic", "ipv6", fallback=ipv6)
-consumer_key = config.get("twitter", "consumer_key", fallback=consumer_key)
-consumer_secret = config.get("twitter", "consumer_secret", fallback=consumer_secret)
-access_token_key = config.get("twitter", "access_token_key", fallback=access_token_key)
-access_token_secret = config.get(
- "twitter", "access_token_secret", fallback=access_token_secret
-)
admin = config.getint("post", "admin", fallback=admin)
lofter_channel = config.getint("post", "lofter_channel", fallback=lofter_channel)
lofter_channel_username = config.get(
"post", "lofter_channel_username", fallback=lofter_channel_username
)
amap_key = config.get("api", "amap_key", fallback=amap_key)
+bili_cookie = config.get("api", "bili_cookie", fallback=bili_cookie)
try:
- ipv6 = strtobool(ipv6)
+ ipv6 = bool(strtobool(ipv6))
except ValueError:
ipv6 = False
diff --git a/defs/mihoyo_bbs.py b/defs/mihoyo_bbs.py
deleted file mode 100644
index 1f0e803..0000000
--- a/defs/mihoyo_bbs.py
+++ /dev/null
@@ -1,28 +0,0 @@
-from defs.browser import get_browser
-
-
-async def get_mihoyo_screenshot(url):
- browser = await get_browser()
- context = await browser.new_context(
- viewport={"width": 2560, "height": 1080},
- device_scale_factor=2,
- )
- page = await context.new_page()
- try:
- await page.goto(url, wait_until="networkidle", timeout=180000)
- # 被删除或者进审核了
- if page.url == "https://www.miyoushe.com/ys/404":
- return None
- card = await page.wait_for_selector(
- ".mhy-article-page__main", timeout=180000, state="visible"
- )
- assert card
- clip = await card.bounding_box()
- assert clip
- clip["width"] += 310
- return await page.screenshot(clip=clip, full_page=True)
- except Exception as e:
- print(f"截取米哈游帖子时发生错误:{e}")
- return await page.screenshot(full_page=True)
- finally:
- await context.close()
diff --git a/defs/twitter_api.py b/defs/twitter_api.py
deleted file mode 100644
index f57388d..0000000
--- a/defs/twitter_api.py
+++ /dev/null
@@ -1,254 +0,0 @@
-import contextlib
-from datetime import datetime, timedelta
-from defs.glover import (
- consumer_key,
- consumer_secret,
- access_token_key,
- access_token_secret,
-)
-
-import twitter
-
-from pyrogram.enums import ParseMode
-from pyrogram.types import (
- InlineKeyboardMarkup,
- InlineKeyboardButton,
- InputMediaPhoto,
- InputMediaVideo,
- InputMediaDocument,
- InputMediaAnimation,
-)
-
-twitter_api = twitter.Api(
- consumer_key=consumer_key,
- consumer_secret=consumer_secret,
- access_token_key=access_token_key,
- access_token_secret=access_token_secret,
- tweet_mode="extended",
- timeout=30,
-)
-
-
-def twitter_link(status_id, qid, uid):
- if qid:
- return InlineKeyboardMarkup(
- [
- [
- InlineKeyboardButton(
- text="Source",
- url=f"https://twitter.com/{uid}/status/{status_id}",
- ),
- InlineKeyboardButton(
- text="RSource", url=f"https://twitter.com/{qid}"
- ),
- InlineKeyboardButton(
- text="Author", url=f"https://twitter.com/{uid}"
- ),
- ]
- ]
- )
- else:
- return InlineKeyboardMarkup(
- [
- [
- InlineKeyboardButton(
- text="Source",
- url=f"https://twitter.com/{uid}/status/{status_id}",
- ),
- InlineKeyboardButton(
- text="Author", url=f"https://twitter.com/{uid}"
- ),
- ]
- ]
- )
-
-
-def twitter_user_link(user_username, status_link):
- return (
- InlineKeyboardMarkup(
- [
- [
- InlineKeyboardButton(
- text="Author", url=f"https://twitter.com/{user_username}"
- ),
- InlineKeyboardButton(text="Status", url=status_link),
- ]
- ]
- )
- if status_link
- else InlineKeyboardMarkup(
- [
- [
- InlineKeyboardButton(
- text="Author", url=f"https://twitter.com/{user_username}"
- )
- ]
- ]
- )
- )
-
-
-def twitter_media(text, media_model, media_list, static: bool = False):
- media_lists = []
- for ff in range(len(media_model)):
- if static:
- media_lists.append(
- InputMediaDocument(
- media_list[ff],
- caption=text if ff == 0 else None,
- parse_mode=ParseMode.HTML,
- )
- )
- elif media_model[ff] == "photo":
- media_lists.append(
- InputMediaPhoto(
- media_list[ff],
- caption=text if ff == 0 else None,
- parse_mode=ParseMode.HTML,
- )
- )
- elif media_model[ff] == "gif":
- media_lists.append(
- InputMediaAnimation(
- media_list[ff],
- caption=text if ff == 0 else None,
- parse_mode=ParseMode.HTML,
- )
- )
- else:
- media_lists.append(
- InputMediaVideo(
- media_list[ff],
- caption=text if ff == 0 else None,
- parse_mode=ParseMode.HTML,
- )
- )
- return media_lists
-
-
-def get_twitter_time(date: str) -> str:
- try:
- date = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") + timedelta(
- hours=8
- )
- return date.strftime("%Y-%m-%d %H:%M:%S")
- except Exception:
- return date
-
-
-def get_twitter_user(url_json):
- user_name = url_json.name
- user_username = url_json.screen_name
- status = ""
- status_link = None
- verified = "💎" if url_json.verified else ""
- protected = "🔒" if url_json.protected else ""
- if url_json.status:
- status_link = f"https://twitter.com/{user_username}/{url_json.status.id_str}"
- status = f'🆕 New Status: {get_twitter_time(url_json.status.created_at)}\n'
- text = (
- f"Twitter User Info\n\n"
- f"Name: {verified}{protected}{user_name}
\n"
- f'Username: @{user_username}\n'
- f"Bio: {url_json.description}
\n"
- f"Joined: {get_twitter_time(url_json.created_at)}
\n"
- f"{status}"
- f"📤 {url_json.statuses_count} ❤️{url_json.favourites_count} "
- f"粉丝 {url_json.followers_count} 关注 {url_json.friends_count}"
- )
- return text, user_username, status_link
-
-
-def get_twitter_status(url_json):
- created_at = get_twitter_time(url_json.created_at)
- favorite_count = (
- url_json.favorite_count if hasattr(url_json, "favorite_count") else 0
- )
- retweet_count = url_json.retweet_count if hasattr(url_json, "retweet_count") else 0
- user_name = url_json.user.name
- user_username = url_json.user.screen_name
- text = url_json.full_text if hasattr(url_json, "full_text") else "暂 无 内 容"
- text = f"{text}
"
- verified = ""
- protected = ""
- if url_json.user.verified:
- verified = "💎"
- if url_json.user.protected:
- protected = "🔒"
- user_text = (
- f'{verified}{protected}{user_name} 发表于 {created_at}'
- f"\n👍 {favorite_count} 🔁 {retweet_count}"
- )
- media_model = []
- media_list = []
- media_alt_list = []
- with contextlib.suppress(Exception):
- media_info = url_json.media
- for i in media_info:
- media_url = i.url if hasattr(i, "url") else None
- if media_url:
- text = text.replace(media_url, "")
- if i.type == "photo":
- media_model.append("photo")
- media_list.append(i.media_url_https)
- elif i.type == "animated_gif":
- media_model.append("gif")
- media_list.append(i.video_info["variants"][0]["url"])
- else:
- media_model.append("video")
- for f in i.video_info["variants"]:
- if f["content_type"] == "video/mp4":
- media_list.append(f["url"])
- break
- try:
- media_alt_list.append(i.ext_alt_text)
- except:
- media_alt_list.append("")
- quoted_status = False
- with contextlib.suppress(Exception):
- quoted = url_json.quoted_status
- quoted_status = (
- quoted.user.screen_name + "/status/" + url_json.quoted_status_id_str
- )
- quoted_created_at = get_twitter_time(quoted.created_at)
- quoted_favorite_count = (
- quoted.favorite_count if hasattr(quoted, "favorite_count") else 0
- )
- quoted_retweet_count = (
- quoted.retweet_count if hasattr(quoted, "retweet_count") else 0
- )
- quoted_user_name = quoted.user.name
- quoted_user_username = quoted.user.screen_name
- quoted_text = quoted.full_text if hasattr(quoted, "full_text") else "暂 无 内 容"
- text += f"\n\n> {quoted_text}
"
- quoted_verified = ""
- quoted_protected = ""
- if quoted.user.verified:
- quoted_verified = "💎"
- if quoted.user.protected:
- quoted_protected = "🔒"
- user_text += (
- f'\n> {quoted_verified}{quoted_protected}'
- f"{quoted_user_name} 发表于 {quoted_created_at}"
- f"\n👍 {quoted_favorite_count} 🔁 {quoted_retweet_count}"
- )
- with contextlib.suppress(Exception):
- quoted_media_info = quoted.media
- for i in quoted_media_info:
- media_url = i.url if hasattr(i, "url") else None
- if media_url:
- text = text.replace(media_url, "")
- if i.type == "photo":
- media_model.append("photo")
- media_list.append(i.media_url_https)
- elif i.type == "animated_gif":
- media_model.append("gif")
- media_list.append(i.video_info["variants"][0]["url"])
- else:
- media_model.append("video")
- media_list.append(i.video_info["variants"][0]["url"])
- try:
- media_alt_list.append(i.ext_alt_text)
- except:
- media_alt_list.append("")
- return text, user_text, media_model, media_list, quoted_status
diff --git a/headers.py b/headers.py
index 5165ba1..9010bf0 100644
--- a/headers.py
+++ b/headers.py
@@ -1,6 +1,3 @@
-bili_headers = {
- "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36"
-}
FANBOX_HEADERS = {
"authority": "api.fanbox.cc",
"accept": "application/json, text/plain, */*",
diff --git a/modules/bilibili.py b/modules/bilibili.py
index 9d48788..c0bf047 100644
--- a/modules/bilibili.py
+++ b/modules/bilibili.py
@@ -29,13 +29,13 @@ async def bili_resolve(_: Client, message: Message):
if video_number:
video_number = video_number[0]
video_info = await video_info_get(video_number) if video_number else None
- if video_info and video_info["code"] == 0:
+ if video_info:
image = await binfo_image_create(video_info)
await message.reply_photo(
image,
quote=True,
reply_markup=gen_button(
- [Button(0, "Link", "https://b23.tv/" + video_info["data"]["bvid"])]
+ [Button(0, "Link", "https://b23.tv/" + video_info["bvid"])]
),
)
raise ContinuePropagation
diff --git a/modules/dc.py b/modules/dc.py
index 97869f4..f08d0d9 100644
--- a/modules/dc.py
+++ b/modules/dc.py
@@ -42,7 +42,7 @@ async def dc_command(_: Client, message: Message):
}
dc, mention = get_dc(message)
if dc:
- text = f"{mention}所在数据中心为: DC{dc}\n" f"该数据中心位于 {geo_dic[str(dc)]}"
+ text = f"{mention}所在数据中心为: DC{dc}\n该数据中心位于 {geo_dic[str(dc)]}"
else:
text = f"{mention}需要先设置头像并且对我可见。"
await message.reply(text)
diff --git a/modules/fragment.py b/modules/fragment.py
index 7e44eef..7a4d805 100644
--- a/modules/fragment.py
+++ b/modules/fragment.py
@@ -78,7 +78,7 @@ async def fragment_inline(_, inline_query: InlineQuery):
user = await parse_fragment(username)
text = user.text
except NotAvailable:
- text = f"用户名:@{username}\n" f"状态:暂未开放购买\n"
+ text = f"用户名:@{username}\n状态:暂未开放购买\n"
except Exception:
text = ""
if not text:
diff --git a/modules/geo.py b/modules/geo.py
index c597766..1560452 100644
--- a/modules/geo.py
+++ b/modules/geo.py
@@ -52,4 +52,4 @@ async def geo_command(_: Client, message: Message):
msg = await message.reply_location(
longitude=float(lat), latitude=float(lon), quote=True
)
- await msg.reply(f"坐标:`{lat},{lon}`\n" f"地址:{formatted_address}", quote=True)
+ await msg.reply(f"坐标:`{lat},{lon}`\n地址:{formatted_address}", quote=True)
diff --git a/modules/ip.py b/modules/ip.py
index 0012e13..35ef700 100644
--- a/modules/ip.py
+++ b/modules/ip.py
@@ -34,9 +34,7 @@ async def ip_command(_: Client, message: Message):
"org,as,mobile,proxy,hosting,query"
)
).json()
- if ipinfo_json["status"] == "fail":
- pass
- elif ipinfo_json["status"] == "success":
+ if ipinfo_json["status"] == "success":
rep_text = ip_info(url, ipinfo_json)
text = ""
if message.entities:
@@ -60,13 +58,11 @@ async def ip_command(_: Client, message: Message):
"org,as,mobile,proxy,hosting,query"
)
).json()
- if ipinfo_json["status"] == "fail":
- pass
- elif ipinfo_json["status"] == "success":
+ if ipinfo_json["status"] == "success":
text = ip_info(url, ipinfo_json)
if text == "":
url = message.text[4:]
- if not url == "":
+ if url != "":
ipinfo_json = (
await request.get(
"http://ip-api.com/json/"
@@ -76,13 +72,11 @@ async def ip_command(_: Client, message: Message):
"org,as,mobile,proxy,hosting,query"
)
).json()
- if ipinfo_json["status"] == "fail":
- pass
- elif ipinfo_json["status"] == "success":
+ if ipinfo_json["status"] == "success":
text = ip_info(url, ipinfo_json)
if rep_text == "" and text == "":
await msg.edit("没有找到要查询的 ip/域名 ...")
- elif not rep_text == "" and not text == "":
+ elif rep_text != "" and text != "":
await msg.edit(f"{rep_text}\n================\n{text}")
else:
await msg.edit(f"{rep_text}{text}")
diff --git a/modules/mihoyo_bbs.py b/modules/mihoyo_bbs.py
deleted file mode 100644
index 15eee2e..0000000
--- a/modules/mihoyo_bbs.py
+++ /dev/null
@@ -1,48 +0,0 @@
-import re
-from io import BytesIO
-
-from pyrogram import Client, filters, ContinuePropagation
-from pyrogram.types import Message
-
-from PIL import Image
-
-from defs.mihoyo_bbs import get_mihoyo_screenshot
-from defs.button import gen_button, Button
-
-
-@Client.on_message(
- filters.incoming
- & filters.text
- & filters.regex(r"(https://)?(m\.)?www.miyoushe.com/.+/article/\d+")
-)
-async def bili_dynamic(_: Client, message: Message):
- # sourcery skip: use-named-expression
- try:
- p = re.compile(r"(https://)?(m\.)?www.miyoushe.com/.+/article/\d+")
- article = p.search(message.text)
- if article:
- article_url = article.group()
- if not article_url.startswith(("https://", "http://")):
- article_url = f"https://{article_url}"
- image = await get_mihoyo_screenshot(article_url)
- if image:
- # 将bytes结果转化为字节流
- photo = BytesIO(image)
- photo.name = "screenshot.png"
- pillow_photo = Image.open(BytesIO(image))
- width, height = pillow_photo.size
- if abs(height - width) > 1300:
- await message.reply_document(
- document=photo,
- quote=True,
- reply_markup=gen_button([Button(0, "Link", article_url)]),
- )
- else:
- await message.reply_photo(
- photo,
- quote=True,
- reply_markup=gen_button([Button(0, "Link", article_url)]),
- )
- except Exception as e:
- print(f"截取米哈游帖子时发生错误:{e}")
- raise ContinuePropagation
diff --git a/modules/start.py b/modules/start.py
index 510f1a4..b2bcec2 100644
--- a/modules/start.py
+++ b/modules/start.py
@@ -5,7 +5,6 @@ from defs.button import gen_button, Button
des = """本机器人特性:
★ 解析 bilibili 视频、动态
-★ 解析 twitter 推文、用户
★ 解析 lofter 日志、用户
★ 解析 fanbox 发帖、用户
★ 汇率查询
diff --git a/modules/twitter_api.py b/modules/twitter_api.py
deleted file mode 100644
index 50220cb..0000000
--- a/modules/twitter_api.py
+++ /dev/null
@@ -1,160 +0,0 @@
-import asyncio
-from urllib.parse import urlparse
-from concurrent.futures import ThreadPoolExecutor
-
-from pyrogram import Client, filters, ContinuePropagation
-from pyrogram.enums import MessageEntityType, ParseMode
-from pyrogram.types import Message
-
-from defs.twitter_api import (
- twitter_api,
- get_twitter_status,
- twitter_link,
- twitter_media,
- twitter_user_link,
- get_twitter_user,
-)
-
-
-@Client.on_message(filters.incoming & filters.text & filters.regex(r"twitter.com/"))
-async def twitter_share(client: Client, message: Message):
- if not message.text:
- return
- static = "static" in message.text
- try:
- for num in range(len(message.entities)):
- entity = message.entities[num]
- if entity.type == MessageEntityType.URL:
- url = message.text[entity.offset : entity.offset + entity.length]
- elif entity.type == MessageEntityType.TEXT_LINK:
- url = entity.url
- else:
- continue
- url = urlparse(url)
- if url.hostname and url.hostname in ["twitter.com", "vxtwitter.com"]:
- if url.path.find("status") >= 0:
- status_id = str(
- url.path[url.path.find("status") + 7 :].split("/")[0]
- ).split("?")[0]
- url_json = None
- with ThreadPoolExecutor() as executor:
- for _ in range(3):
- try:
- future = client.loop.run_in_executor(
- executor, twitter_api.GetStatus, status_id
- )
- url_json = await asyncio.wait_for(
- future, timeout=30, loop=client.loop
- )
- except Exception as e:
- print(e)
- return
- if url_json:
- break
- if not url_json:
- return
- (
- text,
- user_text,
- media_model,
- media_list,
- quoted_status,
- ) = get_twitter_status(url_json)
- text = f"Twitter Status Info\n\n{text}\n\n{user_text}"
- if len(media_model) == 0:
- await client.send_message(
- message.chat.id,
- text,
- parse_mode=ParseMode.HTML,
- disable_web_page_preview=True,
- reply_to_message_id=message.id,
- reply_markup=twitter_link(
- url_json.id, quoted_status, url_json.user.screen_name
- ),
- )
- elif len(media_model) == 1:
- if static:
- await message.reply_document(
- media_list[0],
- caption=text,
- quote=True,
- parse_mode=ParseMode.HTML,
- reply_markup=twitter_link(
- url_json.id,
- quoted_status,
- url_json.user.screen_name,
- ),
- )
- elif media_model[0] == "photo":
- await message.reply_photo(
- media_list[0],
- caption=text,
- parse_mode=ParseMode.HTML,
- quote=True,
- reply_markup=twitter_link(
- url_json.id,
- quoted_status,
- url_json.user.screen_name,
- ),
- )
- elif media_model[0] == "gif":
- await message.reply_animation(
- media_list[0],
- caption=text,
- parse_mode=ParseMode.HTML,
- quote=True,
- reply_markup=twitter_link(
- url_json.id,
- quoted_status,
- url_json.user.screen_name,
- ),
- )
- else:
- await message.reply_video(
- media_list[0],
- caption=text,
- parse_mode=ParseMode.HTML,
- quote=True,
- reply_markup=twitter_link(
- url_json.id,
- quoted_status,
- url_json.user.screen_name,
- ),
- )
- else:
- await client.send_media_group(
- message.chat.id,
- media=twitter_media(text, media_model, media_list, static),
- )
- elif url.path == "/":
- return
- else:
- # 解析用户
- uid = url.path.replace("/", "")
- url_json = None
- with ThreadPoolExecutor() as executor:
- for _ in range(3):
- try:
- future = client.loop.run_in_executor(
- executor, twitter_api.GetUser, None, uid
- )
- url_json = await asyncio.wait_for(
- future, timeout=30, loop=client.loop
- )
- except Exception as e:
- print(e)
- return
- if url_json:
- break
- if not url_json:
- return
- text, user_username, status_link = get_twitter_user(url_json)
- await message.reply_photo(
- url_json.profile_image_url_https.replace("_normal", ""),
- caption=text,
- quote=True,
- reply_markup=twitter_user_link(user_username, status_link),
- )
- except Exception as e:
- print(e)
- raise ContinuePropagation
diff --git a/requirements.txt b/requirements.txt
index cc65f73..cab1496 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,18 +1,16 @@
-pyrogram==2.0.96
+pyrogram==2.0.106
tgcrypto==1.2.5
+bilibili-api-python==15.4.4
httpx
pillow
cashews
coloredlogs
qrcode
-pyncm
-mutagen
playwright
uvicorn
jinja2
apscheduler
pytz
-python-twitter
beautifulsoup4
lxml
sqlalchemy