♻️ 格式化代码

This commit is contained in:
xtaodada 2023-01-12 21:19:54 +08:00
parent 0600c4627d
commit 4a0ae1500d
Signed by: xtaodada
GPG Key ID: 4CBB3F4FA8C85659
42 changed files with 952 additions and 486 deletions

View File

@ -22,8 +22,12 @@ async def what_time(message: str) -> str:
async def how_long(message: str) -> str:
unit = ["", "小时", "", "", "", "", "世纪"]
while re.findall("多久|多长时间", message):
message = message.replace("多久", str(secrets.choice(range(99))) + secrets.choice(unit), 1)
message = message.replace("多长时间", str(secrets.choice(range(99))) + secrets.choice(unit), 1)
message = message.replace(
"多久", str(secrets.choice(range(99))) + secrets.choice(unit), 1
)
message = message.replace(
"多长时间", str(secrets.choice(range(99))) + secrets.choice(unit), 1
)
return message

View File

@ -17,28 +17,30 @@ article_tpl = env.from_string(article_data)
async def check_text(text: str):
try:
url = 'https://asoulcnki.asia/v1/api/check'
url = "https://asoulcnki.asia/v1/api/check"
async with httpx.AsyncClient() as client:
resp = await client.post(url=url, json={'text': text})
resp = await client.post(url=url, json={"text": text})
result = resp.json()
if result['code'] != 0:
if result["code"] != 0:
return None, None
data = result['data']
if not data['related']:
return None, '没有找到重复的小作文捏'
data = result["data"]
if not data["related"]:
return None, "没有找到重复的小作文捏"
rate = data['rate']
related = data['related'][0]
reply_url = str(related['reply_url']).strip()
reply = related['reply']
rate = data["rate"]
related = data["related"][0]
reply_url = str(related["reply_url"]).strip()
reply = related["reply"]
msg = ['枝网文本复制检测报告',
'',
'总复制比 {:.2f}%'.format(rate * 100),
f'相似小作文: <a href="{reply_url}">地点</a> - '
f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(reply["ctime"]))}',]
msg = [
"枝网文本复制检测报告",
"",
"总复制比 {:.2f}%".format(rate * 100),
f'相似小作文: <a href="{reply_url}">地点</a> - '
f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(reply["ctime"]))}',
]
image = await render_reply(reply, diff=text)
if not image:
@ -51,32 +53,29 @@ async def check_text(text: str):
async def random_text(keyword: str = ""):
try:
url = 'https://asoulcnki.asia/v1/api/ranking'
params = {
'pageSize': 10,
'pageNum': 1,
'timeRangeMode': 0,
'sortMode': 0
}
url = "https://asoulcnki.asia/v1/api/ranking"
params = {"pageSize": 10, "pageNum": 1, "timeRangeMode": 0, "sortMode": 0}
if keyword:
params['keywords'] = keyword
params["keywords"] = keyword
else:
params['pageNum'] = random.randint(1, 100)
params["pageNum"] = random.randint(1, 100)
async with httpx.AsyncClient() as client:
resp = await client.get(url=url, params=params)
result = resp.json()
if result['code'] != 0:
if result["code"] != 0:
return None, None
replies = result['data']['replies']
replies = result["data"]["replies"]
if not replies:
return None, '没有找到小作文捏'
return None, "没有找到小作文捏"
reply = random.choice(replies)
image = await render_reply(reply)
reply_url = f"https://t.bilibili.com/{reply['dynamic_id']}/#reply{reply['rpid']}"
reply_url = (
f"https://t.bilibili.com/{reply['dynamic_id']}/#reply{reply['rpid']}"
)
if not image:
return None, f'<a href="{reply_url}">转到小作文</a>'
return image, f'<a href="{reply_url}">转到小作文</a>'
@ -88,25 +87,27 @@ async def random_text(keyword: str = ""):
async def render_reply(reply: dict, diff: str = ""):
try:
article = {}
article['username'] = reply['m_name']
article['like'] = reply['like_num']
article['all_like'] = reply['similar_like_sum']
article['quote'] = reply['similar_count']
article['text'] = diff_text(
diff, reply['content']) if diff else reply['content']
article['time'] = time.strftime(
"%Y-%m-%d", time.localtime(reply['ctime']))
article["username"] = reply["m_name"]
article["like"] = reply["like_num"]
article["all_like"] = reply["similar_like_sum"]
article["quote"] = reply["similar_count"]
article["text"] = (
diff_text(diff, reply["content"]) if diff else reply["content"]
)
article["time"] = time.strftime("%Y-%m-%d", time.localtime(reply["ctime"]))
html = await article_tpl.render_async(article=article)
img_raw = await html_to_pic(html, wait=0, viewport={"width": 500, "height": 100})
img_raw = await html_to_pic(
html, wait=0, viewport={"width": 500, "height": 100}
)
# 将bytes结果转化为字节流
bytes_stream = BytesIO(img_raw)
# 读取到图片
img = Image.open(bytes_stream)
imgByteArr = BytesIO() # 初始化一个空字节流
img.save(imgByteArr, format('PNG')) # 把我们得图片以 PNG 保存到空字节流
img.save(imgByteArr, format("PNG")) # 把我们得图片以 PNG 保存到空字节流
imgByteArr = imgByteArr.getvalue() # 无视指针获取全部内容类型由io流变成bytes。
with open(f"data{sep}asoulcnki.png", 'wb') as i:
with open(f"data{sep}asoulcnki.png", "wb") as i:
i.write(imgByteArr)
return f"data{sep}asoulcnki.png"
except Exception as e:

View File

@ -39,7 +39,7 @@ def cut_text(old_str, cut):
next_str = next_str[1:]
elif s == "\n":
str_list.append(next_str[: i - 1])
next_str = next_str[i - 1:]
next_str = next_str[i - 1 :]
si = 0
i = 0
continue
@ -93,7 +93,9 @@ async def video_info_get(cid):
)
video_info = video_info.json()
elif cid[:2] == "BV":
video_info = httpx.get(f"http://api.bilibili.com/x/web-interface/view?bvid={cid}")
video_info = httpx.get(
f"http://api.bilibili.com/x/web-interface/view?bvid={cid}"
)
video_info = video_info.json()
else:
return
@ -126,7 +128,9 @@ def binfo_image_create(video_info: dict):
minutes, seconds = divmod(video_info["data"]["duration"], 60)
hours, minutes = divmod(minutes, 60)
video_time = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
tiem_font = ImageFont.truetype(f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 30)
tiem_font = ImageFont.truetype(
f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 30
)
draw = ImageDraw.Draw(pic)
draw.text((10, 305), video_time, "white", tiem_font)
@ -137,7 +141,9 @@ def binfo_image_create(video_info: dict):
# 标题
title = video_info["data"]["title"]
title_font = ImageFont.truetype(f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 25)
title_font = ImageFont.truetype(
f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 25
)
title_cut_str = "\n".join(cut_text(title, 40))
_, title_text_y = title_font.getsize_multiline(title_cut_str)
title_bg = Image.new("RGB", (560, title_text_y + 23), "#F5F5F7")
@ -150,7 +156,9 @@ def binfo_image_create(video_info: dict):
dynamic = (
"该视频没有简介" if video_info["data"]["desc"] == "" else video_info["data"]["desc"]
)
dynamic_font = ImageFont.truetype(f"resources{sep}font{sep}sarasa-mono-sc-semibold.ttf", 18)
dynamic_font = ImageFont.truetype(
f"resources{sep}font{sep}sarasa-mono-sc-semibold.ttf", 18
)
dynamic_cut_str = "\n".join(cut_text(dynamic, 58))
_, dynamic_text_y = dynamic_font.getsize_multiline(dynamic_cut_str)
dynamic_bg = Image.new("RGB", (560, dynamic_text_y + 24), "#F5F5F7")
@ -163,7 +171,9 @@ def binfo_image_create(video_info: dict):
# 视频数据
icon_font = ImageFont.truetype(f"resources{sep}font{sep}vanfont.ttf", 46)
icon_color = (247, 145, 185)
info_font = ImageFont.truetype(f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 26)
info_font = ImageFont.truetype(
f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 26
)
view = numf(video_info["data"]["stat"]["view"]) # 播放 \uE6E6
danmaku = numf(video_info["data"]["stat"]["danmaku"]) # 弹幕 \uE6E7
@ -240,9 +250,15 @@ def binfo_image_create(video_info: dict):
mask_draw.ellipse(
(0, 0, face_size[0], face_size[1]), fill=(0, 0, 0, 255) # type: ignore
)
name_font = ImageFont.truetype(f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 24)
up_title_font = ImageFont.truetype(f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 20)
follower_font = ImageFont.truetype(f"resources{sep}font{sep}sarasa-mono-sc-semibold.ttf", 22)
name_font = ImageFont.truetype(
f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 24
)
up_title_font = ImageFont.truetype(
f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 20
)
follower_font = ImageFont.truetype(
f"resources{sep}font{sep}sarasa-mono-sc-semibold.ttf", 22
)
i = 0
for up in up_list:

View File

@ -79,7 +79,8 @@ async def install_browser():
logger.info("正在安装 chromium")
import sys
from playwright.__main__ import main
sys.argv = ['', 'install', 'chromium']
sys.argv = ["", "install", "chromium"]
try:
main()
except SystemExit:

View File

@ -22,5 +22,7 @@ def gen_button(data: List) -> InlineKeyboardMarkup:
if button.type == 0:
buttons_url.append(InlineKeyboardButton(text=button.name, url=button.data))
elif button.type == 1:
buttons_callback.append(InlineKeyboardButton(text=button.name, callback_data=button.data))
buttons_callback.append(
InlineKeyboardButton(text=button.name, callback_data=button.data)
)
return InlineKeyboardMarkup(inline_keyboard=[buttons_callback, buttons_url])

View File

@ -69,8 +69,9 @@ def compare(origin: str, dest: str, sensitive: int):
if matrix[index] >= sensitive:
cache_array.append(new_cache(index_y, matrix[index]))
if matrix[index] > sensitive:
cache_array = remove(cache_array, new_cache(
index_y - 1, matrix[pre_index]))
cache_array = remove(
cache_array, new_cache(index_y - 1, matrix[pre_index])
)
return merge(cache_array)
@ -83,12 +84,12 @@ def render(s: str, flag: List[cache], tag: str):
"""
arr = list(s)
for i in flag:
arr.insert(i.end, f'</{tag}>')
arr.insert(i.start, f'<{tag}>')
return ''.join(arr)
arr.insert(i.end, f"</{tag}>")
arr.insert(i.start, f"<{tag}>")
return "".join(arr)
def diff_text(origin: str, dest: str, sensitive=4, tag='strong'):
def diff_text(origin: str, dest: str, sensitive=4, tag="strong"):
"""对文本重复对比给重复部分加tag
:param origin: 待查重文本
:param dest: 服务器返回的文本

View File

@ -2,7 +2,9 @@ from init import request
class Exchange:
API = "https://cdn.jsdelivr.net/gh/fawazahmed0/currency-api@1/latest/currencies.json"
API = (
"https://cdn.jsdelivr.net/gh/fawazahmed0/currency-api@1/latest/currencies.json"
)
def __init__(self):
self.inited = False
@ -24,7 +26,7 @@ class Exchange:
async def check_ex(self, message):
tlist = message.text.split()
if not 2 < len(tlist) < 5:
return 'help'
return "help"
elif len(tlist) == 3:
num = 1.0
FROM = tlist[1].upper().strip()
@ -33,27 +35,31 @@ class Exchange:
try:
num = float(tlist[1])
if len(str(int(num))) > 10:
return 'ValueBig'
return "ValueBig"
if len(str(num)) > 15:
return 'ValueSmall'
return "ValueSmall"
except ValueError:
return 'ValueError'
return "ValueError"
FROM = tlist[2].upper().strip()
TO = tlist[3].upper().strip()
if self.currencies.count(FROM) == 0:
return 'FromError'
return "FromError"
if self.currencies.count(TO) == 0:
return 'ToError'
endpoint = f"https://cdn.jsdelivr.net/gh/fawazahmed0/currency-api@1/latest/currencies/" \
f"{FROM.lower()}/{TO.lower()}.json"
return "ToError"
endpoint = (
f"https://cdn.jsdelivr.net/gh/fawazahmed0/currency-api@1/latest/currencies/"
f"{FROM.lower()}/{TO.lower()}.json"
)
try:
req = await request.get(endpoint)
rate_data = req.json()
return (f'{num} {FROM} = <b>{round(num * rate_data[TO.lower()], 2)} {TO}</b>\n'
f'Rate: <b>{round(1.0 * rate_data[TO.lower()], 6)}</b>')
return (
f"{num} {FROM} = <b>{round(num * rate_data[TO.lower()], 2)} {TO}</b>\n"
f"Rate: <b>{round(1.0 * rate_data[TO.lower()], 6)}</b>"
)
except Exception as e:
print(e)
return '请求 API 发送错误。'
return "请求 API 发送错误。"
exchange_client = Exchange()

View File

@ -5,7 +5,14 @@ from typing import Optional
from bs4 import BeautifulSoup
from init import request
from models.fragment import AuctionStatus, UserName, TON_TO_USD_RATE, Price, FragmentSubText, FragmentSub
from models.fragment import (
AuctionStatus,
UserName,
TON_TO_USD_RATE,
Price,
FragmentSubText,
FragmentSub,
)
class NotAvailable(Exception):
@ -14,7 +21,9 @@ class NotAvailable(Exception):
async def get_fragment_html(username: str):
try:
resp = await request.get(f"https://fragment.com/username/{username}", follow_redirects=False)
resp = await request.get(
f"https://fragment.com/username/{username}", follow_redirects=False
)
assert resp.status_code == 200
return resp.text
except AssertionError as e:
@ -33,25 +42,50 @@ def parse_user(username: str, html: str) -> UserName:
soup = BeautifulSoup(html, "lxml")
try:
refresh_rate(html)
status = AuctionStatus(soup.find("span", {"class": "tm-section-header-status"}).getText())
status = AuctionStatus(
soup.find("span", {"class": "tm-section-header-status"}).getText()
)
if status == AuctionStatus.OnAuction and "Highest Bid" not in html:
status = AuctionStatus.Available
user = UserName(name=username, status=status)
if user.status == AuctionStatus.Available:
user.now_price = Price(ton=int(soup.find(
"div", {"class": "table-cell-value tm-value icon-before icon-ton"}
).getText().replace(",", "")))
user.now_price = Price(
ton=int(
soup.find(
"div",
{"class": "table-cell-value tm-value icon-before icon-ton"},
)
.getText()
.replace(",", "")
)
)
elif user.status in [AuctionStatus.OnAuction, AuctionStatus.Sale]:
info = soup.find("div", {"class": "tm-section-box tm-section-bid-info"})
user.now_price = Price(ton=int(info.find(
"div", {"class": "table-cell-value tm-value icon-before icon-ton"}
).getText().replace(",", "")))
user.end_time = datetime.fromisoformat(soup.find("time", {"class": "tm-countdown-timer"})["datetime"])
user.now_price = Price(
ton=int(
info.find(
"div",
{"class": "table-cell-value tm-value icon-before icon-ton"},
)
.getText()
.replace(",", "")
)
)
user.end_time = datetime.fromisoformat(
soup.find("time", {"class": "tm-countdown-timer"})["datetime"]
)
elif user.status == AuctionStatus.Sold:
info = soup.find("div", {"class": "tm-section-box tm-section-bid-info"})
user.now_price = Price(ton=int(info.find(
"div", {"class": "table-cell-value tm-value icon-before icon-ton"}
).getText().replace(",", "")))
user.now_price = Price(
ton=int(
info.find(
"div",
{"class": "table-cell-value tm-value icon-before icon-ton"},
)
.getText()
.replace(",", "")
)
)
user.purchaser = info.find("a")["href"].split("/")[-1]
user.end_time = datetime.fromisoformat(info.find("time")["datetime"])
return user
@ -61,7 +95,9 @@ def parse_user(username: str, html: str) -> UserName:
async def search_fragment_html(username: str) -> str:
try:
resp = await request.get(f"https://fragment.com/?query={username}", follow_redirects=False)
resp = await request.get(
f"https://fragment.com/?query={username}", follow_redirects=False
)
return resp.text
except Exception as e:
raise NotAvailable from e
@ -71,7 +107,9 @@ def search_user(username: str, html: str) -> UserName:
soup = BeautifulSoup(html, "lxml")
try:
user = soup.find_all("tr", {"class": "tm-row-selectable"})[0]
status = AuctionStatus(user.find("div", {"class": "table-cell-status-thin"}).getText())
status = AuctionStatus(
user.find("div", {"class": "table-cell-status-thin"}).getText()
)
return UserName(name=username, status=status)
except (AttributeError, ValueError, IndexError) as e:
raise NotAvailable from e
@ -101,5 +139,7 @@ async def parse_sub(status: FragmentSubText, user: Optional[UserName], cid: int)
return "当前没有订阅这个用户名"
elif status == FragmentSubText.List:
if data := (await FragmentSub.get_by_cid(cid)):
return "目前已订阅:\n\n" + "\n".join([f"{i+1}. @{d.username}" for i, d in enumerate(data)])
return "目前已订阅:\n\n" + "\n".join(
[f"{i+1}. @{d.username}" for i, d in enumerate(data)]
)
return "还没有订阅任何用户名"

View File

@ -27,10 +27,14 @@ ipv6 = config.get("basic", "ipv6", fallback=ipv6)
consumer_key = config.get("twitter", "consumer_key", fallback=consumer_key)
consumer_secret = config.get("twitter", "consumer_secret", fallback=consumer_secret)
access_token_key = config.get("twitter", "access_token_key", fallback=access_token_key)
access_token_secret = config.get("twitter", "access_token_secret", fallback=access_token_secret)
access_token_secret = config.get(
"twitter", "access_token_secret", fallback=access_token_secret
)
admin = config.getint("post", "admin", fallback=admin)
lofter_channel = config.getint("post", "lofter_channel", fallback=lofter_channel)
lofter_channel_username = config.get("post", "lofter_channel_username", fallback=lofter_channel_username)
lofter_channel_username = config.get(
"post", "lofter_channel_username", fallback=lofter_channel_username
)
amap_key = config.get("api", "amap_key", fallback=amap_key)
try:
ipv6 = strtobool(ipv6)

View File

@ -4,21 +4,23 @@ from init import request
async def guess_str(key):
if key == '':
return ''
text = {'text': key}
guess_json = (await request.post("https://lab.magiconch.com/api/nbnhhsh/guess", data=text)).json()
if key == "":
return ""
text = {"text": key}
guess_json = (
await request.post("https://lab.magiconch.com/api/nbnhhsh/guess", data=text)
).json()
if len(guess_json) == 0:
return ""
guess_res = []
for num in range(len(guess_json)):
guess_res1 = json.loads(json.dumps(guess_json[num]))
guess_res1_name = guess_res1['name']
guess_res1_name = guess_res1["name"]
try:
guess_res1_ans = ", ".join(guess_res1['trans'])
guess_res1_ans = ", ".join(guess_res1["trans"])
except:
try:
guess_res1_ans = ", ".join(guess_res1['inputting'])
guess_res1_ans = ", ".join(guess_res1["inputting"])
except:
guess_res1_ans = "尚未录入"
guess_res.extend([f"词组:{guess_res1_name}" + "\n释义:" + guess_res1_ans])

View File

@ -3,21 +3,39 @@ import contextlib
def ip_info(url, ipinfo_json):
ipinfo_list = [f"查询目标: `{url}`"]
if ipinfo_json['query'] != url:
ipinfo_list.extend(["解析地址: `" + ipinfo_json['query'] + "`"])
ipinfo_list.extend(["地区: `" + ipinfo_json['country'] + ' - ' + ipinfo_json['regionName'] + ' - ' +
ipinfo_json['city'] + "`"])
ipinfo_list.extend(["经纬度: `" + str(ipinfo_json['lat']) + ',' + str(ipinfo_json['lon']) + "`"])
ipinfo_list.extend(["ISP `" + ipinfo_json['isp'] + "`"])
if ipinfo_json['org'] != '':
ipinfo_list.extend(["组织: `" + ipinfo_json['org'] + "`"])
if ipinfo_json["query"] != url:
ipinfo_list.extend(["解析地址: `" + ipinfo_json["query"] + "`"])
ipinfo_list.extend(
[
"地区: `"
+ ipinfo_json["country"]
+ " - "
+ ipinfo_json["regionName"]
+ " - "
+ ipinfo_json["city"]
+ "`"
]
)
ipinfo_list.extend(
["经纬度: `" + str(ipinfo_json["lat"]) + "," + str(ipinfo_json["lon"]) + "`"]
)
ipinfo_list.extend(["ISP `" + ipinfo_json["isp"] + "`"])
if ipinfo_json["org"] != "":
ipinfo_list.extend(["组织: `" + ipinfo_json["org"] + "`"])
with contextlib.suppress(Exception):
ipinfo_list.extend(
['[' + ipinfo_json['as'] + '](https://bgp.he.net/' + ipinfo_json['as'].split()[0] + ')'])
if ipinfo_json['mobile']:
ipinfo_list.extend(['此 IP 可能为**蜂窝移动数据 IP**'])
if ipinfo_json['proxy']:
ipinfo_list.extend(['此 IP 可能为**代理 IP**'])
if ipinfo_json['hosting']:
ipinfo_list.extend(['此 IP 可能为**数据中心 IP**'])
return '\n'.join(ipinfo_list)
[
"["
+ ipinfo_json["as"]
+ "](https://bgp.he.net/"
+ ipinfo_json["as"].split()[0]
+ ")"
]
)
if ipinfo_json["mobile"]:
ipinfo_list.extend(["此 IP 可能为**蜂窝移动数据 IP**"])
if ipinfo_json["proxy"]:
ipinfo_list.extend(["此 IP 可能为**代理 IP**"])
if ipinfo_json["hosting"]:
ipinfo_list.extend(["此 IP 可能为**数据中心 IP**"])
return "\n".join(ipinfo_list)

View File

@ -7,8 +7,14 @@ from typing import List
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from pyrogram.types import InputMediaPhoto, InlineKeyboardMarkup, InlineKeyboardButton, InputMediaDocument, \
InputMediaAnimation, Message
from pyrogram.types import (
InputMediaPhoto,
InlineKeyboardMarkup,
InlineKeyboardButton,
InputMediaDocument,
InputMediaAnimation,
Message,
)
from defs.glover import lofter_channel_username
from models.lofter import LofterPost as LofterPostModel
@ -16,7 +22,9 @@ from init import request
class LofterItem:
def __init__(self, url, audio_link, title: str, origin_url, username, name, comment, tags):
def __init__(
self, url, audio_link, title: str, origin_url, username, name, comment, tags
):
self.url = url
self.audio_link = f"https://music.163.com/#/song?id={audio_link}"
self.only_text = url is None
@ -24,15 +32,19 @@ class LofterItem:
self.origin_url = origin_url
self.post_id = origin_url.split("/post/")[1].split("?")[0]
self.username = username
self.text = f"<b>Lofter Status Info</b>\n\n" \
f"<code>{title.strip()}</code>\n\n" \
f"✍️ <a href=\"https://{username}.lofter.com/\">{name}</a>\n" \
f"{tags}\n" \
f"{comment}"
self.text = (
f"<b>Lofter Status Info</b>\n\n"
f"<code>{title.strip()}</code>\n\n"
f'✍️ <a href="https://{username}.lofter.com/">{name}</a>\n'
f"{tags}\n"
f"{comment}"
)
async def check_exists(self):
if await LofterPostModel.get_by_post_id(self.post_id):
self.text += f"\n📄 此图集已被<a href=\"https://t.me/{lofter_channel_username}\">此频道</a>收录"
self.text += (
f'\n📄 此图集已被<a href="https://t.me/{lofter_channel_username}">此频道</a>收录'
)
async def init(self):
await self.check_exists()
@ -40,31 +52,49 @@ class LofterItem:
return
file = await request.get(self.url, timeout=30)
file = BytesIO(file.content)
file.name = os.path.basename(self.url).split('?')[0]
file.name = os.path.basename(self.url).split("?")[0]
self.file = file
async def reply_to(self, message: Message, static: bool = False):
if not self.file:
await self.init()
if static:
await message.reply_document(self.file, caption=self.text, quote=True,
reply_markup=lofter_link(self.url, self.origin_url, self.username))
await message.reply_document(
self.file,
caption=self.text,
quote=True,
reply_markup=lofter_link(self.url, self.origin_url, self.username),
)
elif self.only_text:
await message.reply_text(self.text, quote=True, disable_web_page_preview=True,
reply_markup=lofter_link(self.audio_link, self.origin_url, self.username))
elif self.file.name.endswith('.gif'):
await message.reply_animation(self.file, caption=self.text, quote=True,
reply_markup=lofter_link(self.url, self.origin_url, self.username))
await message.reply_text(
self.text,
quote=True,
disable_web_page_preview=True,
reply_markup=lofter_link(
self.audio_link, self.origin_url, self.username
),
)
elif self.file.name.endswith(".gif"):
await message.reply_animation(
self.file,
caption=self.text,
quote=True,
reply_markup=lofter_link(self.url, self.origin_url, self.username),
)
else:
await message.reply_photo(self.file, caption=self.text, quote=True,
reply_markup=lofter_link(self.url, self.origin_url, self.username))
await message.reply_photo(
self.file,
caption=self.text,
quote=True,
reply_markup=lofter_link(self.url, self.origin_url, self.username),
)
async def export(self, static: bool = False, first: bool = False):
if not self.file:
await self.init()
if static:
return InputMediaDocument(self.file, caption=self.text if first else None)
elif self.file.name.endswith('.gif'):
elif self.file.name.endswith(".gif"):
return InputMediaAnimation(self.file, caption=self.text if first else None)
else:
return InputMediaPhoto(self.file, caption=self.text if first else None)
@ -86,17 +116,41 @@ async def get_loft(url: str) -> List[LofterItem]:
audio_link = None
audio = soup.find("div", {"class": "img"})
if audio and audio.getText().strip():
title = f"分享音乐:{audio.getText().strip()}\n\n{title}" if title else f"分享音乐:{audio.getText().strip()}"
audio_link = re.findall(r"%26id%3D(.*?)%26", audio.findAll("div")[1].get("onclick"))[0]
title = (
f"分享音乐:{audio.getText().strip()}\n\n{title}"
if title
else f"分享音乐:{audio.getText().strip()}"
)
audio_link = re.findall(
r"%26id%3D(.*?)%26", audio.findAll("div")[1].get("onclick")
)[0]
comment = soup.findAll("h3", {"class": "nctitle"})
comment_text = "".join(f"{i.getText()} " for i in comment)
if "(" not in comment_text:
comment_text = ""
tags = soup.find("meta", {"name": "Keywords"}).get("content")
tags_text = "".join(f"#{i} " for i in tags.split(","))
return [LofterItem(i.get("bigimgsrc"), audio_link, title, url, username, name, comment_text, tags_text)
for i in links] if links else [
LofterItem(None, audio_link, title, url, username, name, comment_text, tags_text)]
return (
[
LofterItem(
i.get("bigimgsrc"),
audio_link,
title,
url,
username,
name,
comment_text,
tags_text,
)
for i in links
]
if links
else [
LofterItem(
None, audio_link, title, url, username, name, comment_text, tags_text
)
]
)
def parse_loft_user(url: str, content: str):
@ -106,7 +160,9 @@ def parse_loft_user(url: str, content: str):
with contextlib.suppress(Exception):
name = soup.find("title").getText().split("-")[-1].strip()
with contextlib.suppress(Exception):
bio = " - ".join(soup.find("meta", {"name": "Description"}).get("content").split(" - ")[1:])
bio = " - ".join(
soup.find("meta", {"name": "Description"}).get("content").split(" - ")[1:]
)
with contextlib.suppress(Exception):
avatar = soup.find("link", {"rel": "shortcut icon"}).get("href").split("?")[0]
if user := soup.find("div", {"class": "selfinfo"}):
@ -130,22 +186,62 @@ async def get_loft_user(url: str):
if url and "lofter.com/post/" in url:
status_link = url
break
text = f"<b>Lofter User Info</b>\n\n" \
f"Name: <code>{name}</code>\n" \
f"Username: <a href=\"https://{username}.lofter.com\">{username}</a>\n" \
f"Bio: <code{bio}</code>"
text = (
f"<b>Lofter User Info</b>\n\n"
f"Name: <code>{name}</code>\n"
f'Username: <a href="https://{username}.lofter.com">{username}</a>\n'
f"Bio: <code{bio}</code>"
)
return text, avatar, username, status_link
def lofter_link(url, origin, username):
return InlineKeyboardMarkup([[InlineKeyboardButton(text="Source", url=origin),
InlineKeyboardButton(text="Origin", url=url),
InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com")]]) if url \
else InlineKeyboardMarkup([[InlineKeyboardButton(text="Source", url=origin),
InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com")]])
return (
InlineKeyboardMarkup(
[
[
InlineKeyboardButton(text="Source", url=origin),
InlineKeyboardButton(text="Origin", url=url),
InlineKeyboardButton(
text="Author", url=f"https://{username}.lofter.com"
),
]
]
)
if url
else InlineKeyboardMarkup(
[
[
InlineKeyboardButton(text="Source", url=origin),
InlineKeyboardButton(
text="Author", url=f"https://{username}.lofter.com"
),
]
]
)
)
def lofter_user_link(username, status_link):
return InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com"),
InlineKeyboardButton(text="Status", url=status_link)]]) if status_link else \
InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com")]])
return (
InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Author", url=f"https://{username}.lofter.com"
),
InlineKeyboardButton(text="Status", url=status_link),
]
]
)
if status_link
else InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Author", url=f"https://{username}.lofter.com"
)
]
]
)
)

View File

@ -11,9 +11,11 @@ async def get_mihoyo_screenshot(url):
try:
await page.goto(url, wait_until="networkidle", timeout=180000)
# 被删除或者进审核了
if page.url == "https://bbs.mihoyo.com/ys/404":
if page.url == "https://www.miyoushe.com/ys/404":
return None
card = await page.wait_for_selector(".mhy-article-page__main", timeout=180000, state='visible')
card = await page.wait_for_selector(
".mhy-article-page__main", timeout=180000, state="visible"
)
assert card
clip = await card.bounding_box()
assert clip

View File

@ -16,7 +16,7 @@ from models.lofter import LofterPost as LofterPostModel
from models.models.lofter import Lofter as LofterModel
from init import request, bot
pattern = re.compile(r'<[^>]+>', re.S)
pattern = re.compile(r"<[^>]+>", re.S)
class LofterPost:
@ -24,7 +24,7 @@ class LofterPost:
try:
self.grainId = int(url)
except ValueError:
self.grainId = int(url[url.find('grainId=') + 8:].split("&")[0])
self.grainId = int(url[url.find("grainId=") + 8 :].split("&")[0])
try:
self.offset = int(offset)
except ValueError:
@ -39,30 +39,47 @@ class LofterPost:
return res.json()
class Item:
def __init__(self, url, origin_url, title, user_id, username, name, tags, comment, post_id, first, static):
self.url = url.split('?')[0]
def __init__(
self,
url,
origin_url,
title,
user_id,
username,
name,
tags,
comment,
post_id,
first,
static,
):
self.url = url.split("?")[0]
self.origin_url = origin_url
self.user_id = str(user_id)
self.username = username
self.post_id = post_id
self.first = first
self.static = static
title = pattern.sub('\n', title).strip()[:500]
self.text = f"<b>Lofter Status Info</b>\n\n" \
f"<code>{title}</code>\n\n" \
f"✍️ <a href=\"https://{username}.lofter.com/\">{name}</a>\n" \
f"{tags}\n" \
f"{comment}"
title = pattern.sub("\n", title).strip()[:500]
self.text = (
f"<b>Lofter Status Info</b>\n\n"
f"<code>{title}</code>\n\n"
f'✍️ <a href="https://{username}.lofter.com/">{name}</a>\n'
f"{tags}\n"
f"{comment}"
)
async def check_exists(self):
return await LofterPostModel.get_by_post_and_user_id(self.user_id, self.post_id)
return await LofterPostModel.get_by_post_and_user_id(
self.user_id, self.post_id
)
async def add_to_db(self):
post = LofterModel(
user_id=self.user_id,
username=self.username,
post_id=self.post_id,
timestamp=int(time.time())
timestamp=int(time.time()),
)
await LofterPostModel.add_post(post)
@ -75,11 +92,25 @@ class LofterPost:
async def upload(self, file):
try:
if self.static:
await bot.send_document(lofter_channel, file, caption=self.text, disable_notification=True,
reply_markup=lofter_link(self.url, self.origin_url, self.username))
await bot.send_document(
lofter_channel,
file,
caption=self.text,
disable_notification=True,
reply_markup=lofter_link(
self.url, self.origin_url, self.username
),
)
else:
await bot.send_photo(lofter_channel, file, caption=self.text, disable_notification=True,
reply_markup=lofter_link(self.url, self.origin_url, self.username))
await bot.send_photo(
lofter_channel,
file,
caption=self.text,
disable_notification=True,
reply_markup=lofter_link(
self.url, self.origin_url, self.username
),
)
except FloodWait as e:
await asyncio.sleep(e.value + 0.5)
await self.upload(file)
@ -112,19 +143,21 @@ class LofterPost:
width = photo.get("ow", 0)
height = photo.get("oh", 0)
static = abs(height - width) > 1300
datas.append(LofterPost.Item(
url,
origin_url,
title,
user_id,
username,
name,
tags,
comment,
permalink,
first,
static
))
datas.append(
LofterPost.Item(
url,
origin_url,
title,
user_id,
username,
name,
tags,
comment,
permalink,
first,
static,
)
)
first = False
return datas
@ -172,10 +205,14 @@ class LofterPost:
error += 1
if (success + error) % 10 == 0:
with contextlib.suppress(Exception):
await msg.edit(f"已成功上传{success}条,失败{error}条,跳过 {skip} 条,第 {success + error + skip}")
await msg.edit(
f"已成功上传{success}条,失败{error}条,跳过 {skip} 条,第 {success + error + skip}"
)
if self.offset == -1:
break
except Exception as e:
print(f"Error uploading file: {e}")
continue
await msg.edit(f"上传完成,成功{success}条,失败{error}条,跳过 {skip} 条,总共 {success + error + skip}")
await msg.edit(
f"上传完成,成功{success}条,失败{error}条,跳过 {skip} 条,总共 {success + error + skip}"
)

View File

@ -1,73 +1,136 @@
import contextlib
from datetime import datetime, timedelta
from defs.glover import consumer_key, consumer_secret, access_token_key, access_token_secret
from defs.glover import (
consumer_key,
consumer_secret,
access_token_key,
access_token_secret,
)
import twitter
from pyrogram.enums import ParseMode
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, InputMediaPhoto, InputMediaVideo, \
InputMediaDocument, InputMediaAnimation
from pyrogram.types import (
InlineKeyboardMarkup,
InlineKeyboardButton,
InputMediaPhoto,
InputMediaVideo,
InputMediaDocument,
InputMediaAnimation,
)
twitter_api = twitter.Api(consumer_key=consumer_key,
consumer_secret=consumer_secret,
access_token_key=access_token_key,
access_token_secret=access_token_secret,
tweet_mode='extended',
timeout=30)
twitter_api = twitter.Api(
consumer_key=consumer_key,
consumer_secret=consumer_secret,
access_token_key=access_token_key,
access_token_secret=access_token_secret,
tweet_mode="extended",
timeout=30,
)
def twitter_link(status_id, qid, uid):
if qid:
return InlineKeyboardMarkup([[
InlineKeyboardButton(text="Source", url=f"https://twitter.com/{uid}/status/{status_id}"),
InlineKeyboardButton(text="RSource", url=f"https://twitter.com/{qid}"),
InlineKeyboardButton(text="Author", url=f"https://twitter.com/{uid}")]])
return InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Source",
url=f"https://twitter.com/{uid}/status/{status_id}",
),
InlineKeyboardButton(
text="RSource", url=f"https://twitter.com/{qid}"
),
InlineKeyboardButton(
text="Author", url=f"https://twitter.com/{uid}"
),
]
]
)
else:
return InlineKeyboardMarkup([[
InlineKeyboardButton(text="Source", url=f"https://twitter.com/{uid}/status/{status_id}"),
InlineKeyboardButton(text="Author", url=f"https://twitter.com/{uid}")]])
return InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Source",
url=f"https://twitter.com/{uid}/status/{status_id}",
),
InlineKeyboardButton(
text="Author", url=f"https://twitter.com/{uid}"
),
]
]
)
def twitter_user_link(user_username, status_link):
return InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=f"https://twitter.com/{user_username}"),
InlineKeyboardButton(text="Status", url=status_link)]]) if status_link else \
InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=f"https://twitter.com/{user_username}")]])
return (
InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Author", url=f"https://twitter.com/{user_username}"
),
InlineKeyboardButton(text="Status", url=status_link),
]
]
)
if status_link
else InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="Author", url=f"https://twitter.com/{user_username}"
)
]
]
)
)
def twitter_media(text, media_model, media_list, static: bool = False):
media_lists = []
for ff in range(len(media_model)):
if static:
media_lists.append(InputMediaDocument(
media_list[ff],
caption=text if ff == 0 else None,
parse_mode=ParseMode.HTML
))
elif media_model[ff] == 'photo':
media_lists.append(InputMediaPhoto(
media_list[ff],
caption=text if ff == 0 else None,
parse_mode=ParseMode.HTML
))
elif media_model[ff] == 'gif':
media_lists.append(InputMediaAnimation(
media_list[ff],
caption=text if ff == 0 else None,
parse_mode=ParseMode.HTML
))
media_lists.append(
InputMediaDocument(
media_list[ff],
caption=text if ff == 0 else None,
parse_mode=ParseMode.HTML,
)
)
elif media_model[ff] == "photo":
media_lists.append(
InputMediaPhoto(
media_list[ff],
caption=text if ff == 0 else None,
parse_mode=ParseMode.HTML,
)
)
elif media_model[ff] == "gif":
media_lists.append(
InputMediaAnimation(
media_list[ff],
caption=text if ff == 0 else None,
parse_mode=ParseMode.HTML,
)
)
else:
media_lists.append(
InputMediaVideo(
media_list[ff],
caption=text if ff == 0 else None,
parse_mode=ParseMode.HTML
))
parse_mode=ParseMode.HTML,
)
)
return media_lists
def get_twitter_time(date: str) -> str:
try:
date = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") + timedelta(hours=8)
date = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") + timedelta(
hours=8
)
return date.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return date
@ -76,102 +139,116 @@ def get_twitter_time(date: str) -> str:
def get_twitter_user(url_json):
user_name = url_json.name
user_username = url_json.screen_name
status = ''
status = ""
status_link = None
verified = '💎' if url_json.verified else ''
protected = '🔒' if url_json.protected else ''
verified = "💎" if url_json.verified else ""
protected = "🔒" if url_json.protected else ""
if url_json.status:
status_link = f"https://twitter.com/{user_username}/{url_json.status.id_str}"
status = f'🆕 New Status: <a href="{status_link}">{get_twitter_time(url_json.status.created_at)}</a>\n'
text = f'<b>Twitter User Info</b>\n\n' \
f'Name: {verified}{protected}<code>{user_name}</code>\n' \
f'Username: <a href="https://twitter.com/{user_username}">@{user_username}</a>\n' \
f'Bio: <code>{url_json.description}</code>\n' \
f'Joined: <code>{get_twitter_time(url_json.created_at)}</code>\n' \
f'{status}' \
f'📤 {url_json.statuses_count} ❤️{url_json.favourites_count} ' \
f'粉丝 {url_json.followers_count} 关注 {url_json.friends_count}'
text = (
f"<b>Twitter User Info</b>\n\n"
f"Name: {verified}{protected}<code>{user_name}</code>\n"
f'Username: <a href="https://twitter.com/{user_username}">@{user_username}</a>\n'
f"Bio: <code>{url_json.description}</code>\n"
f"Joined: <code>{get_twitter_time(url_json.created_at)}</code>\n"
f"{status}"
f"📤 {url_json.statuses_count} ❤️{url_json.favourites_count} "
f"粉丝 {url_json.followers_count} 关注 {url_json.friends_count}"
)
return text, user_username, status_link
def get_twitter_status(url_json):
created_at = get_twitter_time(url_json.created_at)
favorite_count = url_json.favorite_count if hasattr(url_json, 'favorite_count') else 0
retweet_count = url_json.retweet_count if hasattr(url_json, 'retweet_count') else 0
favorite_count = (
url_json.favorite_count if hasattr(url_json, "favorite_count") else 0
)
retweet_count = url_json.retweet_count if hasattr(url_json, "retweet_count") else 0
user_name = url_json.user.name
user_username = url_json.user.screen_name
text = url_json.full_text if hasattr(url_json, 'full_text') else '暂 无 内 容'
text = f'<code>{text}</code>'
verified = ''
protected = ''
text = url_json.full_text if hasattr(url_json, "full_text") else "暂 无 内 容"
text = f"<code>{text}</code>"
verified = ""
protected = ""
if url_json.user.verified:
verified = '💎'
verified = "💎"
if url_json.user.protected:
protected = '🔒'
user_text = f'{verified}{protected}<a href="https://twitter.com/{user_username}">{user_name}</a> 发表于 {created_at}' \
f'\n👍 {favorite_count} 🔁 {retweet_count}'
protected = "🔒"
user_text = (
f'{verified}{protected}<a href="https://twitter.com/{user_username}">{user_name}</a> 发表于 {created_at}'
f"\n👍 {favorite_count} 🔁 {retweet_count}"
)
media_model = []
media_list = []
media_alt_list = []
with contextlib.suppress(Exception):
media_info = url_json.media
for i in media_info:
media_url = i.url if hasattr(i, 'url') else None
media_url = i.url if hasattr(i, "url") else None
if media_url:
text = text.replace(media_url, '')
if i.type == 'photo':
media_model.append('photo')
text = text.replace(media_url, "")
if i.type == "photo":
media_model.append("photo")
media_list.append(i.media_url_https)
elif i.type == 'animated_gif':
media_model.append('gif')
media_list.append(i.video_info['variants'][0]['url'])
elif i.type == "animated_gif":
media_model.append("gif")
media_list.append(i.video_info["variants"][0]["url"])
else:
media_model.append('video')
for f in i.video_info['variants']:
if f['content_type'] == 'video/mp4':
media_list.append(f['url'])
media_model.append("video")
for f in i.video_info["variants"]:
if f["content_type"] == "video/mp4":
media_list.append(f["url"])
break
try:
media_alt_list.append(i.ext_alt_text)
except:
media_alt_list.append('')
media_alt_list.append("")
quoted_status = False
with contextlib.suppress(Exception):
quoted = url_json.quoted_status
quoted_status = quoted.user.screen_name + '/status/' + url_json.quoted_status_id_str
quoted_status = (
quoted.user.screen_name + "/status/" + url_json.quoted_status_id_str
)
quoted_created_at = get_twitter_time(quoted.created_at)
quoted_favorite_count = quoted.favorite_count if hasattr(quoted, 'favorite_count') else 0
quoted_retweet_count = quoted.retweet_count if hasattr(quoted, 'retweet_count') else 0
quoted_favorite_count = (
quoted.favorite_count if hasattr(quoted, "favorite_count") else 0
)
quoted_retweet_count = (
quoted.retweet_count if hasattr(quoted, "retweet_count") else 0
)
quoted_user_name = quoted.user.name
quoted_user_username = quoted.user.screen_name
quoted_text = quoted.full_text if hasattr(quoted, 'full_text') else '暂 无 内 容'
text += f'\n\n> <code>{quoted_text}</code>'
quoted_verified = ''
quoted_protected = ''
quoted_text = quoted.full_text if hasattr(quoted, "full_text") else "暂 无 内 容"
text += f"\n\n> <code>{quoted_text}</code>"
quoted_verified = ""
quoted_protected = ""
if quoted.user.verified:
quoted_verified = '💎'
quoted_verified = "💎"
if quoted.user.protected:
quoted_protected = '🔒'
user_text += f'\n> {quoted_verified}{quoted_protected}<a href="https://twitter.com/{quoted_user_username}">' \
f'{quoted_user_name}</a> 发表于 {quoted_created_at}' \
f'\n👍 {quoted_favorite_count} 🔁 {quoted_retweet_count}'
quoted_protected = "🔒"
user_text += (
f'\n> {quoted_verified}{quoted_protected}<a href="https://twitter.com/{quoted_user_username}">'
f"{quoted_user_name}</a> 发表于 {quoted_created_at}"
f"\n👍 {quoted_favorite_count} 🔁 {quoted_retweet_count}"
)
with contextlib.suppress(Exception):
quoted_media_info = quoted.media
for i in quoted_media_info:
media_url = i.url if hasattr(i, 'url') else None
media_url = i.url if hasattr(i, "url") else None
if media_url:
text = text.replace(media_url, '')
if i.type == 'photo':
media_model.append('photo')
text = text.replace(media_url, "")
if i.type == "photo":
media_model.append("photo")
media_list.append(i.media_url_https)
elif i.type == 'animated_gif':
media_model.append('gif')
media_list.append(i.video_info['variants'][0]['url'])
elif i.type == "animated_gif":
media_model.append("gif")
media_list.append(i.video_info["variants"][0]["url"])
else:
media_model.append('video')
media_list.append(i.video_info['variants'][0]['url'])
media_model.append("video")
media_list.append(i.video_info["variants"][0]["url"])
try:
media_alt_list.append(i.ext_alt_text)
except:
media_alt_list.append('')
media_alt_list.append("")
return text, user_text, media_model, media_list, quoted_status

View File

@ -1 +1,3 @@
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36'}
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36"
}

View File

@ -37,7 +37,9 @@ class UserMe:
user_me = UserMe()
sqlite = Sqlite()
bot = pyrogram.Client("bot", api_id=api_id, api_hash=api_hash, ipv6=ipv6, plugins=dict(root="modules"))
bot = pyrogram.Client(
"bot", api_id=api_id, api_hash=api_hash, ipv6=ipv6, plugins=dict(root="modules")
)
# temp fix topics group
setattr(pyrogram.types.Message, "old_parse", getattr(pyrogram.types.Message, "_parse"))
setattr(pyrogram.types.Message, "_parse", temp_fix)

View File

@ -74,20 +74,19 @@ class UserName(BaseModel):
@property
def text(self) -> str:
text = f"用户名:@{self.name}\n" \
f"状态:{self.status.text}\n"
text = f"用户名:@{self.name}\n" f"状态:{self.status.text}\n"
if self.status == AuctionStatus.Available:
text += f"最低出价:{self.now_price.text}\n"
elif self.status == AuctionStatus.OnAuction:
text += f"目前最高出价:{self.now_price.text}\n" \
f"距离拍卖结束:{self.end_human_time}\n"
text += f"目前最高出价:{self.now_price.text}\n" f"距离拍卖结束:{self.end_human_time}\n"
elif self.status == AuctionStatus.Sold:
text += f"售出价格:{self.now_price.text}\n" \
f"最终买家:<a href='https://tonapi.io/account/{self.purchaser}'>{self.purchaser[:12]}...</a>\n" \
f"售出时间:{self.strf_end_time}\n"
text += (
f"售出价格:{self.now_price.text}\n"
f"最终买家:<a href='https://tonapi.io/account/{self.purchaser}'>{self.purchaser[:12]}...</a>\n"
f"售出时间:{self.strf_end_time}\n"
)
elif self.status == AuctionStatus.Sale:
text += f"售价:{self.now_price.text}\n" \
f"距离出售结束:{self.end_human_time}\n"
text += f"售价:{self.now_price.text}\n" f"距离出售结束:{self.end_human_time}\n"
return text
@ -117,7 +116,11 @@ class FragmentSub:
async def get_by_cid_and_username(cid: int, username: str) -> Optional[Fragment]:
async with sqlite.Session() as session:
session = cast(AsyncSession, session)
statement = select(Fragment).where(Fragment.cid == cid).where(Fragment.username == username)
statement = (
select(Fragment)
.where(Fragment.cid == cid)
.where(Fragment.username == username)
)
results = await session.exec(statement)
return post[0] if (post := results.first()) else None

View File

@ -2,7 +2,7 @@ from sqlmodel import SQLModel, Field
class Fragment(SQLModel, table=True):
__table_args__ = dict(mysql_charset='utf8mb4', mysql_collate="utf8mb4_general_ci")
__table_args__ = dict(mysql_charset="utf8mb4", mysql_collate="utf8mb4_general_ci")
cid: int = Field(primary_key=True)
username: str = Field(primary_key=True)

View File

@ -2,7 +2,7 @@ from sqlmodel import SQLModel, Field
class Lofter(SQLModel, table=True):
__table_args__ = dict(mysql_charset='utf8mb4', mysql_collate="utf8mb4_general_ci")
__table_args__ = dict(mysql_charset="utf8mb4", mysql_collate="utf8mb4_general_ci")
user_id: str = Field(primary_key=True)
username: str = Field()

View File

@ -7,12 +7,18 @@ async def temp_fix(
users: dict,
chats: dict,
is_scheduled: bool = False,
replies: int = 1
replies: int = 1,
):
parsed = await pyrogram.types.Message.old_parse(client, message, users, chats, is_scheduled, replies) # noqa
if isinstance(message, pyrogram.raw.types.Message) and message.reply_to \
and hasattr(message.reply_to, "forum_topic") and message.reply_to.forum_topic \
and not message.reply_to.reply_to_top_id:
parsed = await pyrogram.types.Message.old_parse(
client, message, users, chats, is_scheduled, replies
) # noqa
if (
isinstance(message, pyrogram.raw.types.Message)
and message.reply_to
and hasattr(message.reply_to, "forum_topic")
and message.reply_to.forum_topic
and not message.reply_to.reply_to_top_id
):
parsed.reply_to_top_message_id = parsed.reply_to_message_id
parsed.reply_to_message_id = None
parsed.reply_to_message = None

View File

@ -18,8 +18,9 @@ async def anti_channel_msg(client: Client, update: Update, _, __: dict):
while True:
try:
# Check for message that are from channel
if (not isinstance(update, types.UpdateNewChannelMessage) or
not isinstance(update.message.from_id, types.PeerChannel)):
if not isinstance(update, types.UpdateNewChannelMessage) or not isinstance(
update.message.from_id, types.PeerChannel
):
raise ContinuePropagation
# Basic data
@ -33,9 +34,12 @@ async def anti_channel_msg(client: Client, update: Update, _, __: dict):
channel_id = int(f"-100{message.from_id.channel_id}")
# Check for linked channel
if ((message.fwd_from and
message.fwd_from.saved_from_peer == message.fwd_from.from_id == message.from_id) or
channel_id == chat_id):
if (
message.fwd_from
and message.fwd_from.saved_from_peer
== message.fwd_from.from_id
== message.from_id
) or channel_id == chat_id:
raise ContinuePropagation
# Check if blocked
@ -56,7 +60,7 @@ async def anti_channel_msg(client: Client, update: Update, _, __: dict):
send_gifs=True,
send_games=True,
send_polls=True,
)
),
)
)
await client.delete_messages(chat_id, message.id)
@ -73,8 +77,11 @@ async def anti_channel_msg(client: Client, update: Update, _, __: dict):
raise ContinuePropagation from e
@Client.on_message(filters.incoming & filters.group &
filters.command(["anti_channel_msg", f"anti_channel_msg@{user_me.username}"]))
@Client.on_message(
filters.incoming
& filters.group
& filters.command(["anti_channel_msg", f"anti_channel_msg@{user_me.username}"])
)
async def switch_anti_channel_msg(client: Client, message: Message):
# Check user
if message.from_user:
@ -93,14 +100,18 @@ async def switch_anti_channel_msg(client: Client, message: Message):
clean(message.chat.id)
await message.reply("Anti-channel is now disabled.")
else:
await message.reply("Anti-channel is already enabled.\n"
"\n"
"Tips: Use `/anti_channel_msg true` to enable or disable it.")
await message.reply(
"Anti-channel is already enabled.\n"
"\n"
"Tips: Use `/anti_channel_msg true` to enable or disable it."
)
elif switch:
add(message.chat.id, message.chat.id)
await message.reply("Anti-channel is now enabled.")
else:
await message.reply("Anti-channel is already disabled.\n"
"\n"
"Tips: Use `/anti_channel_msg true` to enable or disable it.")
await message.reply(
"Anti-channel is already disabled.\n"
"\n"
"Tips: Use `/anti_channel_msg true` to enable or disable it."
)
raise ContinuePropagation

View File

@ -6,8 +6,7 @@ from pyrogram.types import Message
from defs.ask import how_many, what_time, how_long, hif, handle_pers, who
@Client.on_message(filters.incoming &
filters.regex(r"^问"))
@Client.on_message(filters.incoming & filters.regex(r"^问"))
async def ask(_: Client, message: Message):
msg = message
if not message.text:

View File

@ -8,35 +8,46 @@ from pyrogram.types import Message, ChatPermissions
from init import user_me
@Client.on_message(filters.incoming & filters.group &
filters.command(["banme", f"banme@{user_me.username}"]))
@Client.on_message(
filters.incoming
& filters.group
& filters.command(["banme", f"banme@{user_me.username}"])
)
async def ban_me_command(client: Client, message: Message):
args = str(message.text).strip()
# 检查是否有倍数参数
if multiple_text := re.search(r'^(\d+)倍$', args):
if multiple_text := re.search(r"^(\d+)倍$", args):
multiple = int(multiple_text.groups()[0])
else:
multiple = 1
# 检查bot和用户身份
if (await client.get_chat_member(message.chat.id, "self")).status != ChatMemberStatus.ADMINISTRATOR:
await message.reply('Bot非群管理员, 无法执行禁言操作QAQ')
if (
await client.get_chat_member(message.chat.id, "self")
).status != ChatMemberStatus.ADMINISTRATOR:
await message.reply("Bot非群管理员, 无法执行禁言操作QAQ")
return
if not message.from_user:
# 频道
await message.reply('你是个频道, 别来凑热闹OvO')
await message.reply("你是个频道, 别来凑热闹OvO")
return
member = (await client.get_chat_member(message.chat.id, message.from_user.id)).status
member = (
await client.get_chat_member(message.chat.id, message.from_user.id)
).status
if member in [ChatMemberStatus.ADMINISTRATOR, ChatMemberStatus.OWNER]:
await message.reply('你也是个管理, 别来凑热闹OvO')
await message.reply("你也是个管理, 别来凑热闹OvO")
return
# 随机禁言时间
random_time = 2 * int(random.gauss(128 * multiple, 640 * multiple // 10))
act_time = 60 if random_time < 60 else min(random_time, 2591940)
msg = f'既然你那么想被口球的话, 那我就成全你吧!\n送你一份{act_time // 60}{act_time % 60}秒禁言套餐哦, 谢谢惠顾~'
msg = f"既然你那么想被口球的话, 那我就成全你吧!\n送你一份{act_time // 60}{act_time % 60}秒禁言套餐哦, 谢谢惠顾~"
await client.restrict_chat_member(message.chat.id, message.from_user.id, ChatPermissions(),
datetime.now() + timedelta(seconds=act_time))
await client.restrict_chat_member(
message.chat.id,
message.from_user.id,
ChatPermissions(),
datetime.now() + timedelta(seconds=act_time),
)
await message.reply(msg)

View File

@ -4,15 +4,23 @@ from io import BytesIO
from pyrogram import Client, filters, ContinuePropagation
from pyrogram.types import Message
from defs.bilibili import b23_extract, video_info_get, binfo_image_create, get_dynamic_screenshot_pc
from defs.bilibili import (
b23_extract,
video_info_get,
binfo_image_create,
get_dynamic_screenshot_pc,
)
from defs.button import gen_button, Button
@Client.on_message(filters.incoming & filters.text &
filters.regex(r"av(\d{1,12})|BV(1[A-Za-z0-9]{2}4.1.7[A-Za-z0-9]{2})|b23.tv"))
@Client.on_message(
filters.incoming
& filters.text
& filters.regex(r"av(\d{1,12})|BV(1[A-Za-z0-9]{2}4.1.7[A-Za-z0-9]{2})|b23.tv")
)
async def bili_resolve(_: Client, message: Message):
"""
解析 bilibili 链接
解析 bilibili 链接
"""
if "b23.tv" in message.text:
message.text = await b23_extract(message.text)
@ -26,13 +34,16 @@ async def bili_resolve(_: Client, message: Message):
await message.reply_photo(
image,
quote=True,
reply_markup=gen_button([Button(0, "Link", "https://b23.tv/" + video_info["data"]["bvid"])])
reply_markup=gen_button(
[Button(0, "Link", "https://b23.tv/" + video_info["data"]["bvid"])]
),
)
raise ContinuePropagation
@Client.on_message(filters.incoming & filters.text &
filters.regex(r"t.bilibili.com/([0-9]*)"))
@Client.on_message(
filters.incoming & filters.text & filters.regex(r"t.bilibili.com/([0-9]*)")
)
async def bili_dynamic(_: Client, message: Message):
# sourcery skip: use-named-expression
p = re.compile(r"t.bilibili.com/([0-9]*)")
@ -47,6 +58,8 @@ async def bili_dynamic(_: Client, message: Message):
await message.reply_photo(
photo,
quote=True,
reply_markup=gen_button([Button(0, "Link", f"https://t.bilibili.com/{dynamic_id}")])
reply_markup=gen_button(
[Button(0, "Link", f"https://t.bilibili.com/{dynamic_id}")]
),
)
raise ContinuePropagation

View File

@ -10,15 +10,13 @@ from pyrogram.types import Message
book_of_answers: List[str]
with open(f"resources{sep}text{sep}book_of_answers.json", "r", encoding="utf-8") as file:
with open(
f"resources{sep}text{sep}book_of_answers.json", "r", encoding="utf-8"
) as file:
book_of_answers = json.load(file)
@Client.on_message(filters.incoming &
filters.regex(r"^答案之书$"))
@Client.on_message(filters.incoming & filters.regex(r"^答案之书$"))
async def book_of_answer(_: Client, message: Message):
await message.reply_text(
f"{choice(book_of_answers)}",
quote=True
)
await message.reply_text(f"{choice(book_of_answers)}", quote=True)
raise ContinuePropagation

View File

@ -5,7 +5,11 @@ from init import user_me
def mention_chat(chat: Chat) -> str:
return f'<a href="https://t.me/{chat.username}">{chat.title}</a>' if chat.username else chat.title
return (
f'<a href="https://t.me/{chat.username}">{chat.title}</a>'
if chat.username
else chat.title
)
def get_dc(message: Message):
@ -27,15 +31,18 @@ def get_dc(message: Message):
return dc, mention
@Client.on_message(filters.incoming &
filters.command(["dc", f"dc@{user_me.username}"]))
@Client.on_message(filters.incoming & filters.command(["dc", f"dc@{user_me.username}"]))
async def dc_command(_: Client, message: Message):
geo_dic = {'1': '美国-佛罗里达州-迈阿密', '2': '荷兰-阿姆斯特丹', '3': '美国-佛罗里达州-迈阿密',
'4': '荷兰-阿姆斯特丹', '5': '新加坡'}
geo_dic = {
"1": "美国-佛罗里达州-迈阿密",
"2": "荷兰-阿姆斯特丹",
"3": "美国-佛罗里达州-迈阿密",
"4": "荷兰-阿姆斯特丹",
"5": "新加坡",
}
dc, mention = get_dc(message)
if dc:
text = f"{mention}所在数据中心为: <b>DC{dc}</b>\n" \
f"该数据中心位于 <b>{geo_dic[str(dc)]}</b>"
text = f"{mention}所在数据中心为: <b>DC{dc}</b>\n" f"该数据中心位于 <b>{geo_dic[str(dc)]}</b>"
else:
text = f"{mention}需要先<b>设置头像并且对我可见。</b>"
await message.reply(text)

View File

@ -12,29 +12,34 @@ async def exchange_refresh() -> None:
await exchange_client.refresh()
@Client.on_message(filters.incoming &
filters.command(["exchange", f"exchange@{user_me.username}"]))
@Client.on_message(
filters.incoming & filters.command(["exchange", f"exchange@{user_me.username}"])
)
async def exchange_command(_: Client, message: Message):
if not exchange_client.inited:
await exchange_client.refresh()
if not exchange_client.inited:
return await message.reply("获取汇率数据出现错误!")
text = await exchange_client.check_ex(message)
if text == 'help':
text = '该指令可用于查询汇率。\n使用方式举例:\n/exchange USD CNY - 1 USD 等于多少 CNY\n' \
'/exchange 11 CNY USD - 11 CNY 等于多少 USD'
elif text == 'ValueError':
text = '金额不合法'
elif text == 'ValueBig':
text = '我寻思你也没这么多钱啊。'
elif text == 'ValueSmall':
text = '小数点后的数字咋这么多?'
elif text == 'FromError':
text = '不支持的起源币种。'
elif text == 'ToError':
text = '不支持的目标币种。'
if text == "help":
text = (
"该指令可用于查询汇率。\n使用方式举例:\n/exchange USD CNY - 1 USD 等于多少 CNY\n"
"/exchange 11 CNY USD - 11 CNY 等于多少 USD"
)
elif text == "ValueError":
text = "金额不合法"
elif text == "ValueBig":
text = "我寻思你也没这么多钱啊。"
elif text == "ValueSmall":
text = "小数点后的数字咋这么多?"
elif text == "FromError":
text = "不支持的起源币种。"
elif text == "ToError":
text = "不支持的目标币种。"
else:
return await message.reply(text)
reply_ = await message.reply(text)
if message.chat.type == ChatType.PRIVATE:
await reply_.reply('支持货币: <code>' + ', '.join(exchange_client.currencies) + '</code>')
await reply_.reply(
"支持货币: <code>" + ", ".join(exchange_client.currencies) + "</code>"
)

View File

@ -3,8 +3,14 @@ import re
from pyrogram import Client, filters, ContinuePropagation
from pyrogram.enums import ChatMemberStatus
from pyrogram.types import InlineQuery, InlineQueryResultArticle, InputTextMessageContent, InlineKeyboardMarkup, \
InlineKeyboardButton, Message
from pyrogram.types import (
InlineQuery,
InlineQueryResultArticle,
InputTextMessageContent,
InlineKeyboardMarkup,
InlineKeyboardButton,
Message,
)
from models.fragment import FragmentSubText, FragmentSub, AuctionStatus
from defs.fragment import parse_fragment, NotAvailable, parse_sub
@ -14,7 +20,9 @@ from scheduler import scheduler, add_delete_message_job
QUERY_PATTERN = re.compile(r"^@\w[a-zA-Z0-9_]{3,32}$")
@Client.on_message(filters.incoming & filters.command(["username", f"username@{user_me.username}"]))
@Client.on_message(
filters.incoming & filters.command(["username", f"username@{user_me.username}"])
)
async def fragment_command(client: Client, message: Message):
status = None
user = None
@ -70,8 +78,7 @@ async def fragment_inline(_, inline_query: InlineQuery):
user = await parse_fragment(username)
text = user.text
except NotAvailable:
text = f"用户名:@{username}\n" \
f"状态:暂未开放购买\n"
text = f"用户名:@{username}\n" f"状态:暂未开放购买\n"
except Exception:
text = ""
if not text:
@ -89,19 +96,20 @@ async def fragment_inline(_, inline_query: InlineQuery):
description="点击发送详情",
reply_markup=InlineKeyboardMarkup(
[
[InlineKeyboardButton(
"Open",
url=f"https://fragment.com/username/{username}"
)]
[
InlineKeyboardButton(
"Open", url=f"https://fragment.com/username/{username}"
)
]
]
)
),
),
]
await inline_query.answer(
results=results,
switch_pm_text="查询成功",
switch_pm_parameter="start",
cache_time=0
cache_time=0,
)

View File

@ -8,8 +8,7 @@ from pyrogram.types import Message
from defs.friend_say import ImageUtil
@Client.on_message(filters.incoming & filters.group &
filters.regex(r"^我有个朋友"))
@Client.on_message(filters.incoming & filters.group & filters.regex(r"^我有个朋友"))
async def friend_say(client: Client, message: Message):
if not message.reply_to_message:
raise ContinuePropagation
@ -20,13 +19,14 @@ async def friend_say(client: Client, message: Message):
# Get Gravatar
avatar = None
if message.reply_to_message.from_user.photo:
avatar = await client.download_media(message.reply_to_message.from_user.photo.big_file_id,
file_name="avatar.jpg")
avatar = await client.download_media(
message.reply_to_message.from_user.photo.big_file_id, file_name="avatar.jpg"
)
# Get Name
user_name = message.reply_to_message.from_user.first_name
# Create image
if avatar:
with open(avatar, 'rb') as fh:
with open(avatar, "rb") as fh:
buf = BytesIO(fh.read())
ava = ImageUtil(100, 100, background=buf)
else:

View File

@ -12,11 +12,12 @@ from init import user_me, request
REQUEST_URL = f"https://restapi.amap.com/v3/geocode/geo?key={amap_key}&"
@Client.on_message(filters.incoming &
filters.command(["geo", f"geo@{user_me.username}"]))
@Client.on_message(
filters.incoming & filters.command(["geo", f"geo@{user_me.username}"])
)
async def geo_command(_: Client, message: Message):
if len(message.command) <= 1:
await message.reply('没有找到要查询的中国 经纬度/地址 ...')
await message.reply("没有找到要查询的中国 经纬度/地址 ...")
return
mode, lat, lon = "address", 0, 0 # noqa
with contextlib.suppress(ValueError, IndexError):
@ -24,7 +25,11 @@ async def geo_command(_: Client, message: Message):
mode = "location"
if mode == "location":
try:
geo = (await request.get(f"{REQUEST_URL.replace('/geo?', '/regeo?')}location={lat},{lon}")).json()
geo = (
await request.get(
f"{REQUEST_URL.replace('/geo?', '/regeo?')}location={lat},{lon}"
)
).json()
formatted_address = geo["regeocode"]["formatted_address"]
assert isinstance(formatted_address, str)
except (KeyError, AssertionError):
@ -32,12 +37,19 @@ async def geo_command(_: Client, message: Message):
return
else:
try:
geo = (await request.get(f"{REQUEST_URL}address={quote(' '.join(message.command[1:]).strip())}")).json()
geo = (
await request.get(
f"{REQUEST_URL}address={quote(' '.join(message.command[1:]).strip())}"
)
).json()
formatted_address = geo["geocodes"][0]["formatted_address"]
lat, lon = geo["geocodes"][0]["location"].split(",")
except (KeyError, IndexError, ValueError):
await message.reply(f"无法解析地址 {' '.join(message.command[1:]).strip()}", quote=True)
await message.reply(
f"无法解析地址 {' '.join(message.command[1:]).strip()}", quote=True
)
return
msg = await message.reply_location(longitude=float(lat), latitude=float(lon), quote=True)
await msg.reply(f"坐标:`{lat},{lon}`\n"
f"地址:<b>{formatted_address}</b>", quote=True)
msg = await message.reply_location(
longitude=float(lat), latitude=float(lon), quote=True
)
await msg.reply(f"坐标:`{lat},{lon}`\n" f"地址:<b>{formatted_address}</b>", quote=True)

View File

@ -5,23 +5,26 @@ from defs.guess import guess_str
from init import user_me
@Client.on_message(filters.incoming &
filters.command(["guess", f"guess@{user_me.username}"]))
@Client.on_message(
filters.incoming & filters.command(["guess", f"guess@{user_me.username}"])
)
async def guess_command(_: Client, message: Message):
msg = await message.reply('正在查询中...')
if str(message.text.split()) == 1:
text = await guess_str(message.reply_to_message.text)
if text == '':
text = '没有匹配到拼音首字母缩写'
msg = await message.reply("正在查询中...")
if len(message.text.split()) == 1:
text = ""
if reply := message.reply_to_message:
text = await guess_str(reply.text)
if text == "":
text = "没有匹配到拼音首字母缩写"
await msg.edit(text)
else:
rep_text = ''
rep_text = ""
if reply := message.reply_to_message:
rep_text += await guess_str(reply.text)
text = await guess_str(message.text[7:])
if not rep_text and not text:
await msg.edit('没有匹配到拼音首字母缩写')
await msg.edit("没有匹配到拼音首字母缩写")
elif not rep_text:
await msg.edit(f'{text}')
await msg.edit(f"{text}")
else:
await msg.edit(f'{rep_text}{text}')
await msg.edit(f"{rep_text}{text}")

View File

@ -7,64 +7,82 @@ from defs.ip import ip_info
from init import user_me, request
@Client.on_message(filters.incoming &
filters.command(["ip", f"ip@{user_me.username}"]))
@Client.on_message(filters.incoming & filters.command(["ip", f"ip@{user_me.username}"]))
async def ip_command(_: Client, message: Message):
msg = await message.reply('正在查询中...')
rep_text = ''
msg = await message.reply("正在查询中...")
rep_text = ""
reply = message.reply_to_message
if reply:
if reply.entities:
for num in range(0, len(reply.entities)):
url = reply.text[
reply.entities[num].offset:reply.entities[num].offset + reply.entities[num].length]
reply.entities[num].offset : reply.entities[num].offset
+ reply.entities[num].length
]
url = urlparse(url)
if url.hostname or url.path:
if url.hostname:
url = url.hostname
else:
url = url.path
ipinfo_json = (await request.get(
"http://ip-api.com/json/" + url + "?fields=status,message,country,regionName,city,"
"lat,lon,isp,"
"org,as,mobile,proxy,hosting,query")).json()
if ipinfo_json['status'] == 'fail':
ipinfo_json = (
await request.get(
"http://ip-api.com/json/"
+ url
+ "?fields=status,message,country,regionName,city,"
"lat,lon,isp,"
"org,as,mobile,proxy,hosting,query"
)
).json()
if ipinfo_json["status"] == "fail":
pass
elif ipinfo_json['status'] == 'success':
elif ipinfo_json["status"] == "success":
rep_text = ip_info(url, ipinfo_json)
text = ''
text = ""
if message.entities:
for num in range(0, len(message.entities)):
url = message.text[
message.entities[num].offset:message.entities[num].offset + message.entities[num].length]
message.entities[num].offset : message.entities[num].offset
+ message.entities[num].length
]
url = urlparse(url)
if url.hostname or url.path:
if url.hostname:
url = url.hostname
else:
url = url.path
ipinfo_json = (await request.get(
"http://ip-api.com/json/" + url + "?fields=status,message,country,regionName,city,lat,"
"lon,isp,"
"org,as,mobile,proxy,hosting,query")).json()
if ipinfo_json['status'] == 'fail':
ipinfo_json = (
await request.get(
"http://ip-api.com/json/"
+ url
+ "?fields=status,message,country,regionName,city,lat,"
"lon,isp,"
"org,as,mobile,proxy,hosting,query"
)
).json()
if ipinfo_json["status"] == "fail":
pass
elif ipinfo_json['status'] == 'success':
elif ipinfo_json["status"] == "success":
text = ip_info(url, ipinfo_json)
if text == '':
if text == "":
url = message.text[4:]
if not url == '':
ipinfo_json = (await request.get(
"http://ip-api.com/json/" + url + "?fields=status,message,country,regionName,city,lat,"
"lon,isp,"
"org,as,mobile,proxy,hosting,query")).json()
if ipinfo_json['status'] == 'fail':
if not url == "":
ipinfo_json = (
await request.get(
"http://ip-api.com/json/"
+ url
+ "?fields=status,message,country,regionName,city,lat,"
"lon,isp,"
"org,as,mobile,proxy,hosting,query"
)
).json()
if ipinfo_json["status"] == "fail":
pass
elif ipinfo_json['status'] == 'success':
elif ipinfo_json["status"] == "success":
text = ip_info(url, ipinfo_json)
if rep_text == '' and text == '':
await msg.edit('没有找到要查询的 ip/域名 ...')
elif not rep_text == '' and not text == '':
await msg.edit(f'{rep_text}\n================\n{text}')
if rep_text == "" and text == "":
await msg.edit("没有找到要查询的 ip/域名 ...")
elif not rep_text == "" and not text == "":
await msg.edit(f"{rep_text}\n================\n{text}")
else:
await msg.edit(f'{rep_text}{text}')
await msg.edit(f"{rep_text}{text}")

View File

@ -5,8 +5,7 @@ from pyrogram.types import Message
from defs.lofter import get_loft, input_media, get_loft_user, lofter_user_link
@Client.on_message(filters.incoming & filters.text &
filters.regex(r"lofter.com"))
@Client.on_message(filters.incoming & filters.text & filters.regex(r"lofter.com"))
async def lofter_share(_: Client, message: Message):
if not message.text:
return
@ -15,7 +14,7 @@ async def lofter_share(_: Client, message: Message):
for num in range(len(message.entities)):
entity = message.entities[num]
if entity.type == MessageEntityType.URL:
url = message.text[entity.offset:entity.offset + entity.length]
url = message.text[entity.offset : entity.offset + entity.length]
elif entity.type == MessageEntityType.TEXT_LINK:
url = entity.url
else:
@ -27,7 +26,9 @@ async def lofter_share(_: Client, message: Message):
if len(img) == 1:
await img[0].reply_to(message, static=static)
else:
await message.reply_media_group(media=await input_media(img[:9], static), quote=True)
await message.reply_media_group(
media=await input_media(img[:9], static), quote=True
)
elif "front/blog" not in url:
text, avatar, username, status_link = await get_loft_user(url)
if avatar:
@ -35,14 +36,14 @@ async def lofter_share(_: Client, message: Message):
avatar,
caption=text,
quote=True,
reply_markup=lofter_user_link(username, status_link)
reply_markup=lofter_user_link(username, status_link),
)
else:
await message.reply_text(
text,
quote=True,
disable_web_page_preview=True,
reply_markup=lofter_user_link(username, status_link)
reply_markup=lofter_user_link(username, status_link),
)
except Exception as e:
print(e)

View File

@ -4,8 +4,7 @@ from pyrogram.types import Message
from defs.luxun import process_pic
@Client.on_message(filters.incoming &
filters.regex(r"^鲁迅说过"))
@Client.on_message(filters.incoming & filters.regex(r"^鲁迅说过"))
async def luxun_say(_: Client, message: Message):
args = message.text[4:]
if not args:

View File

@ -10,17 +10,20 @@ from defs.mihoyo_bbs import get_mihoyo_screenshot
from defs.button import gen_button, Button
@Client.on_message(filters.incoming & filters.text &
filters.regex(r'(https://)?(m\.)?bbs.mihoyo.com/.+/article/\d+'))
@Client.on_message(
filters.incoming
& filters.text
& filters.regex(r"(https://)?(m\.)?www.miyoushe.com/.+/article/\d+")
)
async def bili_dynamic(_: Client, message: Message):
# sourcery skip: use-named-expression
try:
p = re.compile(r'(https://)?(m\.)?bbs.mihoyo.com/.+/article/\d+')
p = re.compile(r"(https://)?(m\.)?www.miyoushe.com/.+/article/\d+")
article = p.search(message.text)
if article:
article_url = article.group()
if not article_url.startswith(('https://', 'http://')):
article_url = f'https://{article_url}'
if not article_url.startswith(("https://", "http://")):
article_url = f"https://{article_url}"
image = await get_mihoyo_screenshot(article_url)
if image:
# 将bytes结果转化为字节流
@ -32,13 +35,13 @@ async def bili_dynamic(_: Client, message: Message):
await message.reply_document(
document=photo,
quote=True,
reply_markup=gen_button([Button(0, "Link", article_url)])
reply_markup=gen_button([Button(0, "Link", article_url)]),
)
else:
await message.reply_photo(
photo,
quote=True,
reply_markup=gen_button([Button(0, "Link", article_url)])
reply_markup=gen_button([Button(0, "Link", article_url)]),
)
except Exception as e:
print(f"截取米哈游帖子时发生错误:{e}")

View File

@ -5,11 +5,15 @@ from pyrogram.types import Message
from defs.post import LofterPost
@Client.on_message(filters.incoming & filters.private & filters.user(admin) &
filters.command(["lofter_post"]))
@Client.on_message(
filters.incoming
& filters.private
& filters.user(admin)
& filters.command(["lofter_post"])
)
async def lofter_post_command(client: Client, message: Message):
"""
抓取 lofter 粮单
抓取 lofter 粮单
"""
data = message.text.split(" ")
offset = 0

View File

@ -24,7 +24,12 @@ async def repeater_handler(client: Client, message: Message):
msg = t_msg = message.text
if not msg:
raise ContinuePropagation
if msg.startswith("/") or msg.startswith("!") or msg.startswith(",") or msg.startswith(""):
if (
msg.startswith("/")
or msg.startswith("!")
or msg.startswith(",")
or msg.startswith("")
):
raise ContinuePropagation
if msg != last_msg[group_id] or msg == last_repeat_msg[group_id]:

View File

@ -16,14 +16,18 @@ des = """本机器人特性:
"""
@Client.on_message(filters.incoming & filters.private &
filters.command(["start"]))
@Client.on_message(filters.incoming & filters.private & filters.command(["start"]))
async def start_command(_: Client, message: Message):
"""
回应机器人信息
回应机器人信息
"""
await message.reply(des,
quote=True,
reply_markup=gen_button(
[Button(0, "Gitlab", "https://gitlab.com/Xtao-Labs/iShotaBot"),
Button(0, "Github", "https://github.com/Xtao-Labs/iShotaBot")]))
await message.reply(
des,
quote=True,
reply_markup=gen_button(
[
Button(0, "Gitlab", "https://gitlab.com/Xtao-Labs/iShotaBot"),
Button(0, "Github", "https://github.com/Xtao-Labs/iShotaBot"),
]
),
)

View File

@ -6,12 +6,17 @@ from pyrogram import Client, filters, ContinuePropagation
from pyrogram.enums import MessageEntityType, ParseMode
from pyrogram.types import Message
from defs.twitter_api import twitter_api, get_twitter_status, twitter_link, twitter_media, twitter_user_link, \
get_twitter_user
from defs.twitter_api import (
twitter_api,
get_twitter_status,
twitter_link,
twitter_media,
twitter_user_link,
get_twitter_user,
)
@Client.on_message(filters.incoming & filters.text &
filters.regex(r"twitter.com/"))
@Client.on_message(filters.incoming & filters.text & filters.regex(r"twitter.com/"))
async def twitter_share(client: Client, message: Message):
if not message.text:
return
@ -20,21 +25,27 @@ async def twitter_share(client: Client, message: Message):
for num in range(len(message.entities)):
entity = message.entities[num]
if entity.type == MessageEntityType.URL:
url = message.text[entity.offset:entity.offset + entity.length]
url = message.text[entity.offset : entity.offset + entity.length]
elif entity.type == MessageEntityType.TEXT_LINK:
url = entity.url
else:
continue
url = urlparse(url)
if url.hostname and url.hostname in ["twitter.com", "vxtwitter.com"]:
if url.path.find('status') >= 0:
status_id = str(url.path[url.path.find('status') + 7:].split("/")[0]).split("?")[0]
if url.path.find("status") >= 0:
status_id = str(
url.path[url.path.find("status") + 7 :].split("/")[0]
).split("?")[0]
url_json = None
with ThreadPoolExecutor() as executor:
for _ in range(3):
try:
future = client.loop.run_in_executor(executor, twitter_api.GetStatus, status_id)
url_json = await asyncio.wait_for(future, timeout=30, loop=client.loop)
future = client.loop.run_in_executor(
executor, twitter_api.GetStatus, status_id
)
url_json = await asyncio.wait_for(
future, timeout=30, loop=client.loop
)
except Exception as e:
print(e)
return
@ -42,15 +53,24 @@ async def twitter_share(client: Client, message: Message):
break
if not url_json:
return
text, user_text, media_model, media_list, quoted_status = get_twitter_status(url_json)
text = f'<b>Twitter Status Info</b>\n\n{text}\n\n{user_text}'
(
text,
user_text,
media_model,
media_list,
quoted_status,
) = get_twitter_status(url_json)
text = f"<b>Twitter Status Info</b>\n\n{text}\n\n{user_text}"
if len(media_model) == 0:
await client.send_message(
message.chat.id, text,
message.chat.id,
text,
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
reply_to_message_id=message.id,
reply_markup=twitter_link(url_json.id, quoted_status, url_json.user.screen_name)
reply_markup=twitter_link(
url_json.id, quoted_status, url_json.user.screen_name
),
)
elif len(media_model) == 1:
if static:
@ -59,23 +79,35 @@ async def twitter_share(client: Client, message: Message):
caption=text,
quote=True,
parse_mode=ParseMode.HTML,
reply_markup=twitter_link(url_json.id, quoted_status, url_json.user.screen_name)
reply_markup=twitter_link(
url_json.id,
quoted_status,
url_json.user.screen_name,
),
)
elif media_model[0] == 'photo':
elif media_model[0] == "photo":
await message.reply_photo(
media_list[0],
caption=text,
parse_mode=ParseMode.HTML,
quote=True,
reply_markup=twitter_link(url_json.id, quoted_status, url_json.user.screen_name)
reply_markup=twitter_link(
url_json.id,
quoted_status,
url_json.user.screen_name,
),
)
elif media_model[0] == 'gif':
elif media_model[0] == "gif":
await message.reply_animation(
media_list[0],
caption=text,
parse_mode=ParseMode.HTML,
quote=True,
reply_markup=twitter_link(url_json.id, quoted_status, url_json.user.screen_name)
reply_markup=twitter_link(
url_json.id,
quoted_status,
url_json.user.screen_name,
),
)
else:
await message.reply_video(
@ -83,22 +115,32 @@ async def twitter_share(client: Client, message: Message):
caption=text,
parse_mode=ParseMode.HTML,
quote=True,
reply_markup=twitter_link(url_json.id, quoted_status, url_json.user.screen_name)
reply_markup=twitter_link(
url_json.id,
quoted_status,
url_json.user.screen_name,
),
)
else:
await client.send_media_group(message.chat.id,
media=twitter_media(text, media_model, media_list, static))
elif url.path == '/':
await client.send_media_group(
message.chat.id,
media=twitter_media(text, media_model, media_list, static),
)
elif url.path == "/":
return
else:
# 解析用户
uid = url.path.replace('/', '')
uid = url.path.replace("/", "")
url_json = None
with ThreadPoolExecutor() as executor:
for _ in range(3):
try:
future = client.loop.run_in_executor(executor, twitter_api.GetUser, None, uid)
url_json = await asyncio.wait_for(future, timeout=30, loop=client.loop)
future = client.loop.run_in_executor(
executor, twitter_api.GetUser, None, uid
)
url_json = await asyncio.wait_for(
future, timeout=30, loop=client.loop
)
except Exception as e:
print(e)
return
@ -108,10 +150,10 @@ async def twitter_share(client: Client, message: Message):
return
text, user_username, status_link = get_twitter_user(url_json)
await message.reply_photo(
url_json.profile_image_url_https.replace('_normal', ''),
url_json.profile_image_url_https.replace("_normal", ""),
caption=text,
quote=True,
reply_markup=twitter_user_link(user_username, status_link)
reply_markup=twitter_user_link(user_username, status_link),
)
except Exception as e:
print(e)

View File

@ -1,4 +1,4 @@
pyrogram==2.0.63
pyrogram==2.0.96
tgcrypto==1.2.5
httpx
pillow

View File

@ -17,9 +17,12 @@ async def delete_message(message: Message) -> bool:
def add_delete_message_job(message: Message, delete_seconds: int = 60):
scheduler.add_job(
delete_message, "date",
delete_message,
"date",
id=f"{message.chat.id}|{message.id}|delete_message",
name=f"{message.chat.id}|{message.id}|delete_message",
args=[message],
run_date=datetime.datetime.now(pytz.timezone("Asia/Shanghai")) + datetime.timedelta(seconds=delete_seconds),
replace_existing=True)
run_date=datetime.datetime.now(pytz.timezone("Asia/Shanghai"))
+ datetime.timedelta(seconds=delete_seconds),
replace_existing=True,
)