From 4a0ae1500d259cc9febb45d779057cbdc3958865 Mon Sep 17 00:00:00 2001 From: xtaodada Date: Thu, 12 Jan 2023 21:19:54 +0800 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20=E6=A0=BC=E5=BC=8F?= =?UTF-8?q?=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- defs/ask.py | 8 +- defs/asoulcnki.py | 79 +++++------ defs/bilibili.py | 34 +++-- defs/browser.py | 3 +- defs/button.py | 4 +- defs/diff.py | 13 +- defs/exchange.py | 30 +++-- defs/fragment.py | 72 +++++++--- defs/glover.py | 8 +- defs/guess.py | 16 ++- defs/ip.py | 50 ++++--- defs/lofter.py | 172 ++++++++++++++++++------ defs/mihoyo_bbs.py | 6 +- defs/post.py | 99 +++++++++----- defs/twitter_api.py | 265 ++++++++++++++++++++++++------------- headers.py | 4 +- init.py | 4 +- models/fragment.py | 23 ++-- models/models/fragment.py | 2 +- models/models/lofter.py | 2 +- models/temp_fix.py | 16 ++- modules/anti_channel.py | 39 ++++-- modules/ask.py | 3 +- modules/banme.py | 33 +++-- modules/bilibili.py | 29 ++-- modules/book_of_answers.py | 12 +- modules/dc.py | 21 ++- modules/exchange.py | 37 +++--- modules/fragment.py | 30 +++-- modules/friend_say.py | 10 +- modules/geo.py | 30 +++-- modules/guess.py | 25 ++-- modules/ip.py | 82 +++++++----- modules/lofter.py | 13 +- modules/luxun.py | 3 +- modules/mihoyo_bbs.py | 17 ++- modules/post.py | 10 +- modules/repeater.py | 7 +- modules/start.py | 20 +-- modules/twitter_api.py | 96 ++++++++++---- requirements.txt | 2 +- scheduler.py | 9 +- 42 files changed, 952 insertions(+), 486 deletions(-) diff --git a/defs/ask.py b/defs/ask.py index 985aaa2..9a7370f 100644 --- a/defs/ask.py +++ b/defs/ask.py @@ -22,8 +22,12 @@ async def what_time(message: str) -> str: async def how_long(message: str) -> str: unit = ["秒", "小时", "天", "周", "月", "年", "世纪"] while re.findall("多久|多长时间", message): - message = message.replace("多久", str(secrets.choice(range(99))) + secrets.choice(unit), 1) - message = message.replace("多长时间", str(secrets.choice(range(99))) + secrets.choice(unit), 1) + message = message.replace( + "多久", str(secrets.choice(range(99))) + secrets.choice(unit), 1 + ) + message = message.replace( + "多长时间", str(secrets.choice(range(99))) + secrets.choice(unit), 1 + ) return message diff --git a/defs/asoulcnki.py b/defs/asoulcnki.py index 577e7ad..950ce1c 100644 --- a/defs/asoulcnki.py +++ b/defs/asoulcnki.py @@ -17,28 +17,30 @@ article_tpl = env.from_string(article_data) async def check_text(text: str): try: - url = 'https://asoulcnki.asia/v1/api/check' + url = "https://asoulcnki.asia/v1/api/check" async with httpx.AsyncClient() as client: - resp = await client.post(url=url, json={'text': text}) + resp = await client.post(url=url, json={"text": text}) result = resp.json() - if result['code'] != 0: + if result["code"] != 0: return None, None - data = result['data'] - if not data['related']: - return None, '没有找到重复的小作文捏' + data = result["data"] + if not data["related"]: + return None, "没有找到重复的小作文捏" - rate = data['rate'] - related = data['related'][0] - reply_url = str(related['reply_url']).strip() - reply = related['reply'] + rate = data["rate"] + related = data["related"][0] + reply_url = str(related["reply_url"]).strip() + reply = related["reply"] - msg = ['枝网文本复制检测报告', - '', - '总复制比 {:.2f}%'.format(rate * 100), - f'相似小作文: 地点 - ' - f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(reply["ctime"]))}',] + msg = [ + "枝网文本复制检测报告", + "", + "总复制比 {:.2f}%".format(rate * 100), + f'相似小作文: 地点 - ' + f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(reply["ctime"]))}', + ] image = await render_reply(reply, diff=text) if not image: @@ -51,32 +53,29 @@ async def check_text(text: str): async def random_text(keyword: str = ""): try: - url = 'https://asoulcnki.asia/v1/api/ranking' - params = { - 'pageSize': 10, - 'pageNum': 1, - 'timeRangeMode': 0, - 'sortMode': 0 - } + url = "https://asoulcnki.asia/v1/api/ranking" + params = {"pageSize": 10, "pageNum": 1, "timeRangeMode": 0, "sortMode": 0} if keyword: - params['keywords'] = keyword + params["keywords"] = keyword else: - params['pageNum'] = random.randint(1, 100) + params["pageNum"] = random.randint(1, 100) async with httpx.AsyncClient() as client: resp = await client.get(url=url, params=params) result = resp.json() - if result['code'] != 0: + if result["code"] != 0: return None, None - replies = result['data']['replies'] + replies = result["data"]["replies"] if not replies: - return None, '没有找到小作文捏' + return None, "没有找到小作文捏" reply = random.choice(replies) image = await render_reply(reply) - reply_url = f"https://t.bilibili.com/{reply['dynamic_id']}/#reply{reply['rpid']}" + reply_url = ( + f"https://t.bilibili.com/{reply['dynamic_id']}/#reply{reply['rpid']}" + ) if not image: return None, f'转到小作文' return image, f'转到小作文' @@ -88,25 +87,27 @@ async def random_text(keyword: str = ""): async def render_reply(reply: dict, diff: str = ""): try: article = {} - article['username'] = reply['m_name'] - article['like'] = reply['like_num'] - article['all_like'] = reply['similar_like_sum'] - article['quote'] = reply['similar_count'] - article['text'] = diff_text( - diff, reply['content']) if diff else reply['content'] - article['time'] = time.strftime( - "%Y-%m-%d", time.localtime(reply['ctime'])) + article["username"] = reply["m_name"] + article["like"] = reply["like_num"] + article["all_like"] = reply["similar_like_sum"] + article["quote"] = reply["similar_count"] + article["text"] = ( + diff_text(diff, reply["content"]) if diff else reply["content"] + ) + article["time"] = time.strftime("%Y-%m-%d", time.localtime(reply["ctime"])) html = await article_tpl.render_async(article=article) - img_raw = await html_to_pic(html, wait=0, viewport={"width": 500, "height": 100}) + img_raw = await html_to_pic( + html, wait=0, viewport={"width": 500, "height": 100} + ) # 将bytes结果转化为字节流 bytes_stream = BytesIO(img_raw) # 读取到图片 img = Image.open(bytes_stream) imgByteArr = BytesIO() # 初始化一个空字节流 - img.save(imgByteArr, format('PNG')) # 把我们得图片以 PNG 保存到空字节流 + img.save(imgByteArr, format("PNG")) # 把我们得图片以 PNG 保存到空字节流 imgByteArr = imgByteArr.getvalue() # 无视指针,获取全部内容,类型由io流变成bytes。 - with open(f"data{sep}asoulcnki.png", 'wb') as i: + with open(f"data{sep}asoulcnki.png", "wb") as i: i.write(imgByteArr) return f"data{sep}asoulcnki.png" except Exception as e: diff --git a/defs/bilibili.py b/defs/bilibili.py index 2637ce4..a95f0ef 100644 --- a/defs/bilibili.py +++ b/defs/bilibili.py @@ -39,7 +39,7 @@ def cut_text(old_str, cut): next_str = next_str[1:] elif s == "\n": str_list.append(next_str[: i - 1]) - next_str = next_str[i - 1:] + next_str = next_str[i - 1 :] si = 0 i = 0 continue @@ -93,7 +93,9 @@ async def video_info_get(cid): ) video_info = video_info.json() elif cid[:2] == "BV": - video_info = httpx.get(f"http://api.bilibili.com/x/web-interface/view?bvid={cid}") + video_info = httpx.get( + f"http://api.bilibili.com/x/web-interface/view?bvid={cid}" + ) video_info = video_info.json() else: return @@ -126,7 +128,9 @@ def binfo_image_create(video_info: dict): minutes, seconds = divmod(video_info["data"]["duration"], 60) hours, minutes = divmod(minutes, 60) video_time = f"{hours:02d}:{minutes:02d}:{seconds:02d}" - tiem_font = ImageFont.truetype(f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 30) + tiem_font = ImageFont.truetype( + f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 30 + ) draw = ImageDraw.Draw(pic) draw.text((10, 305), video_time, "white", tiem_font) @@ -137,7 +141,9 @@ def binfo_image_create(video_info: dict): # 标题 title = video_info["data"]["title"] - title_font = ImageFont.truetype(f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 25) + title_font = ImageFont.truetype( + f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 25 + ) title_cut_str = "\n".join(cut_text(title, 40)) _, title_text_y = title_font.getsize_multiline(title_cut_str) title_bg = Image.new("RGB", (560, title_text_y + 23), "#F5F5F7") @@ -150,7 +156,9 @@ def binfo_image_create(video_info: dict): dynamic = ( "该视频没有简介" if video_info["data"]["desc"] == "" else video_info["data"]["desc"] ) - dynamic_font = ImageFont.truetype(f"resources{sep}font{sep}sarasa-mono-sc-semibold.ttf", 18) + dynamic_font = ImageFont.truetype( + f"resources{sep}font{sep}sarasa-mono-sc-semibold.ttf", 18 + ) dynamic_cut_str = "\n".join(cut_text(dynamic, 58)) _, dynamic_text_y = dynamic_font.getsize_multiline(dynamic_cut_str) dynamic_bg = Image.new("RGB", (560, dynamic_text_y + 24), "#F5F5F7") @@ -163,7 +171,9 @@ def binfo_image_create(video_info: dict): # 视频数据 icon_font = ImageFont.truetype(f"resources{sep}font{sep}vanfont.ttf", 46) icon_color = (247, 145, 185) - info_font = ImageFont.truetype(f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 26) + info_font = ImageFont.truetype( + f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 26 + ) view = numf(video_info["data"]["stat"]["view"]) # 播放 \uE6E6 danmaku = numf(video_info["data"]["stat"]["danmaku"]) # 弹幕 \uE6E7 @@ -240,9 +250,15 @@ def binfo_image_create(video_info: dict): mask_draw.ellipse( (0, 0, face_size[0], face_size[1]), fill=(0, 0, 0, 255) # type: ignore ) - name_font = ImageFont.truetype(f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 24) - up_title_font = ImageFont.truetype(f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 20) - follower_font = ImageFont.truetype(f"resources{sep}font{sep}sarasa-mono-sc-semibold.ttf", 22) + name_font = ImageFont.truetype( + f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 24 + ) + up_title_font = ImageFont.truetype( + f"resources{sep}font{sep}sarasa-mono-sc-bold.ttf", 20 + ) + follower_font = ImageFont.truetype( + f"resources{sep}font{sep}sarasa-mono-sc-semibold.ttf", 22 + ) i = 0 for up in up_list: diff --git a/defs/browser.py b/defs/browser.py index d8566f6..c4f3449 100644 --- a/defs/browser.py +++ b/defs/browser.py @@ -79,7 +79,8 @@ async def install_browser(): logger.info("正在安装 chromium") import sys from playwright.__main__ import main - sys.argv = ['', 'install', 'chromium'] + + sys.argv = ["", "install", "chromium"] try: main() except SystemExit: diff --git a/defs/button.py b/defs/button.py index 2ef93d9..9e75d4d 100644 --- a/defs/button.py +++ b/defs/button.py @@ -22,5 +22,7 @@ def gen_button(data: List) -> InlineKeyboardMarkup: if button.type == 0: buttons_url.append(InlineKeyboardButton(text=button.name, url=button.data)) elif button.type == 1: - buttons_callback.append(InlineKeyboardButton(text=button.name, callback_data=button.data)) + buttons_callback.append( + InlineKeyboardButton(text=button.name, callback_data=button.data) + ) return InlineKeyboardMarkup(inline_keyboard=[buttons_callback, buttons_url]) diff --git a/defs/diff.py b/defs/diff.py index 5708bbe..223df94 100644 --- a/defs/diff.py +++ b/defs/diff.py @@ -69,8 +69,9 @@ def compare(origin: str, dest: str, sensitive: int): if matrix[index] >= sensitive: cache_array.append(new_cache(index_y, matrix[index])) if matrix[index] > sensitive: - cache_array = remove(cache_array, new_cache( - index_y - 1, matrix[pre_index])) + cache_array = remove( + cache_array, new_cache(index_y - 1, matrix[pre_index]) + ) return merge(cache_array) @@ -83,12 +84,12 @@ def render(s: str, flag: List[cache], tag: str): """ arr = list(s) for i in flag: - arr.insert(i.end, f'') - arr.insert(i.start, f'<{tag}>') - return ''.join(arr) + arr.insert(i.end, f"") + arr.insert(i.start, f"<{tag}>") + return "".join(arr) -def diff_text(origin: str, dest: str, sensitive=4, tag='strong'): +def diff_text(origin: str, dest: str, sensitive=4, tag="strong"): """对文本重复对比,给重复部分加tag :param origin: 待查重文本 :param dest: 服务器返回的文本 diff --git a/defs/exchange.py b/defs/exchange.py index e495d98..38fe3f7 100644 --- a/defs/exchange.py +++ b/defs/exchange.py @@ -2,7 +2,9 @@ from init import request class Exchange: - API = "https://cdn.jsdelivr.net/gh/fawazahmed0/currency-api@1/latest/currencies.json" + API = ( + "https://cdn.jsdelivr.net/gh/fawazahmed0/currency-api@1/latest/currencies.json" + ) def __init__(self): self.inited = False @@ -24,7 +26,7 @@ class Exchange: async def check_ex(self, message): tlist = message.text.split() if not 2 < len(tlist) < 5: - return 'help' + return "help" elif len(tlist) == 3: num = 1.0 FROM = tlist[1].upper().strip() @@ -33,27 +35,31 @@ class Exchange: try: num = float(tlist[1]) if len(str(int(num))) > 10: - return 'ValueBig' + return "ValueBig" if len(str(num)) > 15: - return 'ValueSmall' + return "ValueSmall" except ValueError: - return 'ValueError' + return "ValueError" FROM = tlist[2].upper().strip() TO = tlist[3].upper().strip() if self.currencies.count(FROM) == 0: - return 'FromError' + return "FromError" if self.currencies.count(TO) == 0: - return 'ToError' - endpoint = f"https://cdn.jsdelivr.net/gh/fawazahmed0/currency-api@1/latest/currencies/" \ - f"{FROM.lower()}/{TO.lower()}.json" + return "ToError" + endpoint = ( + f"https://cdn.jsdelivr.net/gh/fawazahmed0/currency-api@1/latest/currencies/" + f"{FROM.lower()}/{TO.lower()}.json" + ) try: req = await request.get(endpoint) rate_data = req.json() - return (f'{num} {FROM} = {round(num * rate_data[TO.lower()], 2)} {TO}\n' - f'Rate: {round(1.0 * rate_data[TO.lower()], 6)}') + return ( + f"{num} {FROM} = {round(num * rate_data[TO.lower()], 2)} {TO}\n" + f"Rate: {round(1.0 * rate_data[TO.lower()], 6)}" + ) except Exception as e: print(e) - return '请求 API 发送错误。' + return "请求 API 发送错误。" exchange_client = Exchange() diff --git a/defs/fragment.py b/defs/fragment.py index da2290e..5d21f15 100644 --- a/defs/fragment.py +++ b/defs/fragment.py @@ -5,7 +5,14 @@ from typing import Optional from bs4 import BeautifulSoup from init import request -from models.fragment import AuctionStatus, UserName, TON_TO_USD_RATE, Price, FragmentSubText, FragmentSub +from models.fragment import ( + AuctionStatus, + UserName, + TON_TO_USD_RATE, + Price, + FragmentSubText, + FragmentSub, +) class NotAvailable(Exception): @@ -14,7 +21,9 @@ class NotAvailable(Exception): async def get_fragment_html(username: str): try: - resp = await request.get(f"https://fragment.com/username/{username}", follow_redirects=False) + resp = await request.get( + f"https://fragment.com/username/{username}", follow_redirects=False + ) assert resp.status_code == 200 return resp.text except AssertionError as e: @@ -33,25 +42,50 @@ def parse_user(username: str, html: str) -> UserName: soup = BeautifulSoup(html, "lxml") try: refresh_rate(html) - status = AuctionStatus(soup.find("span", {"class": "tm-section-header-status"}).getText()) + status = AuctionStatus( + soup.find("span", {"class": "tm-section-header-status"}).getText() + ) if status == AuctionStatus.OnAuction and "Highest Bid" not in html: status = AuctionStatus.Available user = UserName(name=username, status=status) if user.status == AuctionStatus.Available: - user.now_price = Price(ton=int(soup.find( - "div", {"class": "table-cell-value tm-value icon-before icon-ton"} - ).getText().replace(",", ""))) + user.now_price = Price( + ton=int( + soup.find( + "div", + {"class": "table-cell-value tm-value icon-before icon-ton"}, + ) + .getText() + .replace(",", "") + ) + ) elif user.status in [AuctionStatus.OnAuction, AuctionStatus.Sale]: info = soup.find("div", {"class": "tm-section-box tm-section-bid-info"}) - user.now_price = Price(ton=int(info.find( - "div", {"class": "table-cell-value tm-value icon-before icon-ton"} - ).getText().replace(",", ""))) - user.end_time = datetime.fromisoformat(soup.find("time", {"class": "tm-countdown-timer"})["datetime"]) + user.now_price = Price( + ton=int( + info.find( + "div", + {"class": "table-cell-value tm-value icon-before icon-ton"}, + ) + .getText() + .replace(",", "") + ) + ) + user.end_time = datetime.fromisoformat( + soup.find("time", {"class": "tm-countdown-timer"})["datetime"] + ) elif user.status == AuctionStatus.Sold: info = soup.find("div", {"class": "tm-section-box tm-section-bid-info"}) - user.now_price = Price(ton=int(info.find( - "div", {"class": "table-cell-value tm-value icon-before icon-ton"} - ).getText().replace(",", ""))) + user.now_price = Price( + ton=int( + info.find( + "div", + {"class": "table-cell-value tm-value icon-before icon-ton"}, + ) + .getText() + .replace(",", "") + ) + ) user.purchaser = info.find("a")["href"].split("/")[-1] user.end_time = datetime.fromisoformat(info.find("time")["datetime"]) return user @@ -61,7 +95,9 @@ def parse_user(username: str, html: str) -> UserName: async def search_fragment_html(username: str) -> str: try: - resp = await request.get(f"https://fragment.com/?query={username}", follow_redirects=False) + resp = await request.get( + f"https://fragment.com/?query={username}", follow_redirects=False + ) return resp.text except Exception as e: raise NotAvailable from e @@ -71,7 +107,9 @@ def search_user(username: str, html: str) -> UserName: soup = BeautifulSoup(html, "lxml") try: user = soup.find_all("tr", {"class": "tm-row-selectable"})[0] - status = AuctionStatus(user.find("div", {"class": "table-cell-status-thin"}).getText()) + status = AuctionStatus( + user.find("div", {"class": "table-cell-status-thin"}).getText() + ) return UserName(name=username, status=status) except (AttributeError, ValueError, IndexError) as e: raise NotAvailable from e @@ -101,5 +139,7 @@ async def parse_sub(status: FragmentSubText, user: Optional[UserName], cid: int) return "当前没有订阅这个用户名" elif status == FragmentSubText.List: if data := (await FragmentSub.get_by_cid(cid)): - return "目前已订阅:\n\n" + "\n".join([f"{i+1}. @{d.username}" for i, d in enumerate(data)]) + return "目前已订阅:\n\n" + "\n".join( + [f"{i+1}. @{d.username}" for i, d in enumerate(data)] + ) return "还没有订阅任何用户名" diff --git a/defs/glover.py b/defs/glover.py index 26f0a19..e4bd162 100644 --- a/defs/glover.py +++ b/defs/glover.py @@ -27,10 +27,14 @@ ipv6 = config.get("basic", "ipv6", fallback=ipv6) consumer_key = config.get("twitter", "consumer_key", fallback=consumer_key) consumer_secret = config.get("twitter", "consumer_secret", fallback=consumer_secret) access_token_key = config.get("twitter", "access_token_key", fallback=access_token_key) -access_token_secret = config.get("twitter", "access_token_secret", fallback=access_token_secret) +access_token_secret = config.get( + "twitter", "access_token_secret", fallback=access_token_secret +) admin = config.getint("post", "admin", fallback=admin) lofter_channel = config.getint("post", "lofter_channel", fallback=lofter_channel) -lofter_channel_username = config.get("post", "lofter_channel_username", fallback=lofter_channel_username) +lofter_channel_username = config.get( + "post", "lofter_channel_username", fallback=lofter_channel_username +) amap_key = config.get("api", "amap_key", fallback=amap_key) try: ipv6 = strtobool(ipv6) diff --git a/defs/guess.py b/defs/guess.py index 799a84a..339254d 100644 --- a/defs/guess.py +++ b/defs/guess.py @@ -4,21 +4,23 @@ from init import request async def guess_str(key): - if key == '': - return '' - text = {'text': key} - guess_json = (await request.post("https://lab.magiconch.com/api/nbnhhsh/guess", data=text)).json() + if key == "": + return "" + text = {"text": key} + guess_json = ( + await request.post("https://lab.magiconch.com/api/nbnhhsh/guess", data=text) + ).json() if len(guess_json) == 0: return "" guess_res = [] for num in range(len(guess_json)): guess_res1 = json.loads(json.dumps(guess_json[num])) - guess_res1_name = guess_res1['name'] + guess_res1_name = guess_res1["name"] try: - guess_res1_ans = ", ".join(guess_res1['trans']) + guess_res1_ans = ", ".join(guess_res1["trans"]) except: try: - guess_res1_ans = ", ".join(guess_res1['inputting']) + guess_res1_ans = ", ".join(guess_res1["inputting"]) except: guess_res1_ans = "尚未录入" guess_res.extend([f"词组:{guess_res1_name}" + "\n释义:" + guess_res1_ans]) diff --git a/defs/ip.py b/defs/ip.py index 9be8165..906d14c 100644 --- a/defs/ip.py +++ b/defs/ip.py @@ -3,21 +3,39 @@ import contextlib def ip_info(url, ipinfo_json): ipinfo_list = [f"查询目标: `{url}`"] - if ipinfo_json['query'] != url: - ipinfo_list.extend(["解析地址: `" + ipinfo_json['query'] + "`"]) - ipinfo_list.extend(["地区: `" + ipinfo_json['country'] + ' - ' + ipinfo_json['regionName'] + ' - ' + - ipinfo_json['city'] + "`"]) - ipinfo_list.extend(["经纬度: `" + str(ipinfo_json['lat']) + ',' + str(ipinfo_json['lon']) + "`"]) - ipinfo_list.extend(["ISP: `" + ipinfo_json['isp'] + "`"]) - if ipinfo_json['org'] != '': - ipinfo_list.extend(["组织: `" + ipinfo_json['org'] + "`"]) + if ipinfo_json["query"] != url: + ipinfo_list.extend(["解析地址: `" + ipinfo_json["query"] + "`"]) + ipinfo_list.extend( + [ + "地区: `" + + ipinfo_json["country"] + + " - " + + ipinfo_json["regionName"] + + " - " + + ipinfo_json["city"] + + "`" + ] + ) + ipinfo_list.extend( + ["经纬度: `" + str(ipinfo_json["lat"]) + "," + str(ipinfo_json["lon"]) + "`"] + ) + ipinfo_list.extend(["ISP: `" + ipinfo_json["isp"] + "`"]) + if ipinfo_json["org"] != "": + ipinfo_list.extend(["组织: `" + ipinfo_json["org"] + "`"]) with contextlib.suppress(Exception): ipinfo_list.extend( - ['[' + ipinfo_json['as'] + '](https://bgp.he.net/' + ipinfo_json['as'].split()[0] + ')']) - if ipinfo_json['mobile']: - ipinfo_list.extend(['此 IP 可能为**蜂窝移动数据 IP**']) - if ipinfo_json['proxy']: - ipinfo_list.extend(['此 IP 可能为**代理 IP**']) - if ipinfo_json['hosting']: - ipinfo_list.extend(['此 IP 可能为**数据中心 IP**']) - return '\n'.join(ipinfo_list) + [ + "[" + + ipinfo_json["as"] + + "](https://bgp.he.net/" + + ipinfo_json["as"].split()[0] + + ")" + ] + ) + if ipinfo_json["mobile"]: + ipinfo_list.extend(["此 IP 可能为**蜂窝移动数据 IP**"]) + if ipinfo_json["proxy"]: + ipinfo_list.extend(["此 IP 可能为**代理 IP**"]) + if ipinfo_json["hosting"]: + ipinfo_list.extend(["此 IP 可能为**数据中心 IP**"]) + return "\n".join(ipinfo_list) diff --git a/defs/lofter.py b/defs/lofter.py index 722897c..f6019f2 100644 --- a/defs/lofter.py +++ b/defs/lofter.py @@ -7,8 +7,14 @@ from typing import List from urllib.parse import urlparse from bs4 import BeautifulSoup -from pyrogram.types import InputMediaPhoto, InlineKeyboardMarkup, InlineKeyboardButton, InputMediaDocument, \ - InputMediaAnimation, Message +from pyrogram.types import ( + InputMediaPhoto, + InlineKeyboardMarkup, + InlineKeyboardButton, + InputMediaDocument, + InputMediaAnimation, + Message, +) from defs.glover import lofter_channel_username from models.lofter import LofterPost as LofterPostModel @@ -16,7 +22,9 @@ from init import request class LofterItem: - def __init__(self, url, audio_link, title: str, origin_url, username, name, comment, tags): + def __init__( + self, url, audio_link, title: str, origin_url, username, name, comment, tags + ): self.url = url self.audio_link = f"https://music.163.com/#/song?id={audio_link}" self.only_text = url is None @@ -24,15 +32,19 @@ class LofterItem: self.origin_url = origin_url self.post_id = origin_url.split("/post/")[1].split("?")[0] self.username = username - self.text = f"Lofter Status Info\n\n" \ - f"{title.strip()}\n\n" \ - f"✍️ {name}\n" \ - f"{tags}\n" \ - f"{comment}" + self.text = ( + f"Lofter Status Info\n\n" + f"{title.strip()}\n\n" + f'✍️ {name}\n' + f"{tags}\n" + f"{comment}" + ) async def check_exists(self): if await LofterPostModel.get_by_post_id(self.post_id): - self.text += f"\n📄 此图集已被此频道收录" + self.text += ( + f'\n📄 此图集已被此频道收录' + ) async def init(self): await self.check_exists() @@ -40,31 +52,49 @@ class LofterItem: return file = await request.get(self.url, timeout=30) file = BytesIO(file.content) - file.name = os.path.basename(self.url).split('?')[0] + file.name = os.path.basename(self.url).split("?")[0] self.file = file async def reply_to(self, message: Message, static: bool = False): if not self.file: await self.init() if static: - await message.reply_document(self.file, caption=self.text, quote=True, - reply_markup=lofter_link(self.url, self.origin_url, self.username)) + await message.reply_document( + self.file, + caption=self.text, + quote=True, + reply_markup=lofter_link(self.url, self.origin_url, self.username), + ) elif self.only_text: - await message.reply_text(self.text, quote=True, disable_web_page_preview=True, - reply_markup=lofter_link(self.audio_link, self.origin_url, self.username)) - elif self.file.name.endswith('.gif'): - await message.reply_animation(self.file, caption=self.text, quote=True, - reply_markup=lofter_link(self.url, self.origin_url, self.username)) + await message.reply_text( + self.text, + quote=True, + disable_web_page_preview=True, + reply_markup=lofter_link( + self.audio_link, self.origin_url, self.username + ), + ) + elif self.file.name.endswith(".gif"): + await message.reply_animation( + self.file, + caption=self.text, + quote=True, + reply_markup=lofter_link(self.url, self.origin_url, self.username), + ) else: - await message.reply_photo(self.file, caption=self.text, quote=True, - reply_markup=lofter_link(self.url, self.origin_url, self.username)) + await message.reply_photo( + self.file, + caption=self.text, + quote=True, + reply_markup=lofter_link(self.url, self.origin_url, self.username), + ) async def export(self, static: bool = False, first: bool = False): if not self.file: await self.init() if static: return InputMediaDocument(self.file, caption=self.text if first else None) - elif self.file.name.endswith('.gif'): + elif self.file.name.endswith(".gif"): return InputMediaAnimation(self.file, caption=self.text if first else None) else: return InputMediaPhoto(self.file, caption=self.text if first else None) @@ -86,17 +116,41 @@ async def get_loft(url: str) -> List[LofterItem]: audio_link = None audio = soup.find("div", {"class": "img"}) if audio and audio.getText().strip(): - title = f"分享音乐:{audio.getText().strip()}\n\n{title}" if title else f"分享音乐:{audio.getText().strip()}" - audio_link = re.findall(r"%26id%3D(.*?)%26", audio.findAll("div")[1].get("onclick"))[0] + title = ( + f"分享音乐:{audio.getText().strip()}\n\n{title}" + if title + else f"分享音乐:{audio.getText().strip()}" + ) + audio_link = re.findall( + r"%26id%3D(.*?)%26", audio.findAll("div")[1].get("onclick") + )[0] comment = soup.findAll("h3", {"class": "nctitle"}) comment_text = "".join(f"{i.getText()} " for i in comment) if "(" not in comment_text: comment_text = "" tags = soup.find("meta", {"name": "Keywords"}).get("content") tags_text = "".join(f"#{i} " for i in tags.split(",")) - return [LofterItem(i.get("bigimgsrc"), audio_link, title, url, username, name, comment_text, tags_text) - for i in links] if links else [ - LofterItem(None, audio_link, title, url, username, name, comment_text, tags_text)] + return ( + [ + LofterItem( + i.get("bigimgsrc"), + audio_link, + title, + url, + username, + name, + comment_text, + tags_text, + ) + for i in links + ] + if links + else [ + LofterItem( + None, audio_link, title, url, username, name, comment_text, tags_text + ) + ] + ) def parse_loft_user(url: str, content: str): @@ -106,7 +160,9 @@ def parse_loft_user(url: str, content: str): with contextlib.suppress(Exception): name = soup.find("title").getText().split("-")[-1].strip() with contextlib.suppress(Exception): - bio = " - ".join(soup.find("meta", {"name": "Description"}).get("content").split(" - ")[1:]) + bio = " - ".join( + soup.find("meta", {"name": "Description"}).get("content").split(" - ")[1:] + ) with contextlib.suppress(Exception): avatar = soup.find("link", {"rel": "shortcut icon"}).get("href").split("?")[0] if user := soup.find("div", {"class": "selfinfo"}): @@ -130,22 +186,62 @@ async def get_loft_user(url: str): if url and "lofter.com/post/" in url: status_link = url break - text = f"Lofter User Info\n\n" \ - f"Name: {name}\n" \ - f"Username: {username}\n" \ - f"Bio: " + text = ( + f"Lofter User Info\n\n" + f"Name: {name}\n" + f'Username: {username}\n' + f"Bio: " + ) return text, avatar, username, status_link def lofter_link(url, origin, username): - return InlineKeyboardMarkup([[InlineKeyboardButton(text="Source", url=origin), - InlineKeyboardButton(text="Origin", url=url), - InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com")]]) if url \ - else InlineKeyboardMarkup([[InlineKeyboardButton(text="Source", url=origin), - InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com")]]) + return ( + InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton(text="Source", url=origin), + InlineKeyboardButton(text="Origin", url=url), + InlineKeyboardButton( + text="Author", url=f"https://{username}.lofter.com" + ), + ] + ] + ) + if url + else InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton(text="Source", url=origin), + InlineKeyboardButton( + text="Author", url=f"https://{username}.lofter.com" + ), + ] + ] + ) + ) def lofter_user_link(username, status_link): - return InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com"), - InlineKeyboardButton(text="Status", url=status_link)]]) if status_link else \ - InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=f"https://{username}.lofter.com")]]) + return ( + InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton( + text="Author", url=f"https://{username}.lofter.com" + ), + InlineKeyboardButton(text="Status", url=status_link), + ] + ] + ) + if status_link + else InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton( + text="Author", url=f"https://{username}.lofter.com" + ) + ] + ] + ) + ) diff --git a/defs/mihoyo_bbs.py b/defs/mihoyo_bbs.py index 2838330..1f0e803 100644 --- a/defs/mihoyo_bbs.py +++ b/defs/mihoyo_bbs.py @@ -11,9 +11,11 @@ async def get_mihoyo_screenshot(url): try: await page.goto(url, wait_until="networkidle", timeout=180000) # 被删除或者进审核了 - if page.url == "https://bbs.mihoyo.com/ys/404": + if page.url == "https://www.miyoushe.com/ys/404": return None - card = await page.wait_for_selector(".mhy-article-page__main", timeout=180000, state='visible') + card = await page.wait_for_selector( + ".mhy-article-page__main", timeout=180000, state="visible" + ) assert card clip = await card.bounding_box() assert clip diff --git a/defs/post.py b/defs/post.py index c9199bc..5267d80 100644 --- a/defs/post.py +++ b/defs/post.py @@ -16,7 +16,7 @@ from models.lofter import LofterPost as LofterPostModel from models.models.lofter import Lofter as LofterModel from init import request, bot -pattern = re.compile(r'<[^>]+>', re.S) +pattern = re.compile(r"<[^>]+>", re.S) class LofterPost: @@ -24,7 +24,7 @@ class LofterPost: try: self.grainId = int(url) except ValueError: - self.grainId = int(url[url.find('grainId=') + 8:].split("&")[0]) + self.grainId = int(url[url.find("grainId=") + 8 :].split("&")[0]) try: self.offset = int(offset) except ValueError: @@ -39,30 +39,47 @@ class LofterPost: return res.json() class Item: - def __init__(self, url, origin_url, title, user_id, username, name, tags, comment, post_id, first, static): - self.url = url.split('?')[0] + def __init__( + self, + url, + origin_url, + title, + user_id, + username, + name, + tags, + comment, + post_id, + first, + static, + ): + self.url = url.split("?")[0] self.origin_url = origin_url self.user_id = str(user_id) self.username = username self.post_id = post_id self.first = first self.static = static - title = pattern.sub('\n', title).strip()[:500] - self.text = f"Lofter Status Info\n\n" \ - f"{title}\n\n" \ - f"✍️ {name}\n" \ - f"{tags}\n" \ - f"{comment}" + title = pattern.sub("\n", title).strip()[:500] + self.text = ( + f"Lofter Status Info\n\n" + f"{title}\n\n" + f'✍️ {name}\n' + f"{tags}\n" + f"{comment}" + ) async def check_exists(self): - return await LofterPostModel.get_by_post_and_user_id(self.user_id, self.post_id) + return await LofterPostModel.get_by_post_and_user_id( + self.user_id, self.post_id + ) async def add_to_db(self): post = LofterModel( user_id=self.user_id, username=self.username, post_id=self.post_id, - timestamp=int(time.time()) + timestamp=int(time.time()), ) await LofterPostModel.add_post(post) @@ -75,11 +92,25 @@ class LofterPost: async def upload(self, file): try: if self.static: - await bot.send_document(lofter_channel, file, caption=self.text, disable_notification=True, - reply_markup=lofter_link(self.url, self.origin_url, self.username)) + await bot.send_document( + lofter_channel, + file, + caption=self.text, + disable_notification=True, + reply_markup=lofter_link( + self.url, self.origin_url, self.username + ), + ) else: - await bot.send_photo(lofter_channel, file, caption=self.text, disable_notification=True, - reply_markup=lofter_link(self.url, self.origin_url, self.username)) + await bot.send_photo( + lofter_channel, + file, + caption=self.text, + disable_notification=True, + reply_markup=lofter_link( + self.url, self.origin_url, self.username + ), + ) except FloodWait as e: await asyncio.sleep(e.value + 0.5) await self.upload(file) @@ -112,19 +143,21 @@ class LofterPost: width = photo.get("ow", 0) height = photo.get("oh", 0) static = abs(height - width) > 1300 - datas.append(LofterPost.Item( - url, - origin_url, - title, - user_id, - username, - name, - tags, - comment, - permalink, - first, - static - )) + datas.append( + LofterPost.Item( + url, + origin_url, + title, + user_id, + username, + name, + tags, + comment, + permalink, + first, + static, + ) + ) first = False return datas @@ -172,10 +205,14 @@ class LofterPost: error += 1 if (success + error) % 10 == 0: with contextlib.suppress(Exception): - await msg.edit(f"已成功上传{success}条,失败{error}条,跳过 {skip} 条,第 {success + error + skip} 条") + await msg.edit( + f"已成功上传{success}条,失败{error}条,跳过 {skip} 条,第 {success + error + skip} 条" + ) if self.offset == -1: break except Exception as e: print(f"Error uploading file: {e}") continue - await msg.edit(f"上传完成,成功{success}条,失败{error}条,跳过 {skip} 条,总共 {success + error + skip} 条") + await msg.edit( + f"上传完成,成功{success}条,失败{error}条,跳过 {skip} 条,总共 {success + error + skip} 条" + ) diff --git a/defs/twitter_api.py b/defs/twitter_api.py index 62a6aa9..f57388d 100644 --- a/defs/twitter_api.py +++ b/defs/twitter_api.py @@ -1,73 +1,136 @@ import contextlib from datetime import datetime, timedelta -from defs.glover import consumer_key, consumer_secret, access_token_key, access_token_secret +from defs.glover import ( + consumer_key, + consumer_secret, + access_token_key, + access_token_secret, +) import twitter from pyrogram.enums import ParseMode -from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, InputMediaPhoto, InputMediaVideo, \ - InputMediaDocument, InputMediaAnimation +from pyrogram.types import ( + InlineKeyboardMarkup, + InlineKeyboardButton, + InputMediaPhoto, + InputMediaVideo, + InputMediaDocument, + InputMediaAnimation, +) -twitter_api = twitter.Api(consumer_key=consumer_key, - consumer_secret=consumer_secret, - access_token_key=access_token_key, - access_token_secret=access_token_secret, - tweet_mode='extended', - timeout=30) +twitter_api = twitter.Api( + consumer_key=consumer_key, + consumer_secret=consumer_secret, + access_token_key=access_token_key, + access_token_secret=access_token_secret, + tweet_mode="extended", + timeout=30, +) def twitter_link(status_id, qid, uid): if qid: - return InlineKeyboardMarkup([[ - InlineKeyboardButton(text="Source", url=f"https://twitter.com/{uid}/status/{status_id}"), - InlineKeyboardButton(text="RSource", url=f"https://twitter.com/{qid}"), - InlineKeyboardButton(text="Author", url=f"https://twitter.com/{uid}")]]) + return InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton( + text="Source", + url=f"https://twitter.com/{uid}/status/{status_id}", + ), + InlineKeyboardButton( + text="RSource", url=f"https://twitter.com/{qid}" + ), + InlineKeyboardButton( + text="Author", url=f"https://twitter.com/{uid}" + ), + ] + ] + ) else: - return InlineKeyboardMarkup([[ - InlineKeyboardButton(text="Source", url=f"https://twitter.com/{uid}/status/{status_id}"), - InlineKeyboardButton(text="Author", url=f"https://twitter.com/{uid}")]]) + return InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton( + text="Source", + url=f"https://twitter.com/{uid}/status/{status_id}", + ), + InlineKeyboardButton( + text="Author", url=f"https://twitter.com/{uid}" + ), + ] + ] + ) def twitter_user_link(user_username, status_link): - return InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=f"https://twitter.com/{user_username}"), - InlineKeyboardButton(text="Status", url=status_link)]]) if status_link else \ - InlineKeyboardMarkup([[InlineKeyboardButton(text="Author", url=f"https://twitter.com/{user_username}")]]) + return ( + InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton( + text="Author", url=f"https://twitter.com/{user_username}" + ), + InlineKeyboardButton(text="Status", url=status_link), + ] + ] + ) + if status_link + else InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton( + text="Author", url=f"https://twitter.com/{user_username}" + ) + ] + ] + ) + ) def twitter_media(text, media_model, media_list, static: bool = False): media_lists = [] for ff in range(len(media_model)): if static: - media_lists.append(InputMediaDocument( - media_list[ff], - caption=text if ff == 0 else None, - parse_mode=ParseMode.HTML - )) - elif media_model[ff] == 'photo': - media_lists.append(InputMediaPhoto( - media_list[ff], - caption=text if ff == 0 else None, - parse_mode=ParseMode.HTML - )) - elif media_model[ff] == 'gif': - media_lists.append(InputMediaAnimation( - media_list[ff], - caption=text if ff == 0 else None, - parse_mode=ParseMode.HTML - )) + media_lists.append( + InputMediaDocument( + media_list[ff], + caption=text if ff == 0 else None, + parse_mode=ParseMode.HTML, + ) + ) + elif media_model[ff] == "photo": + media_lists.append( + InputMediaPhoto( + media_list[ff], + caption=text if ff == 0 else None, + parse_mode=ParseMode.HTML, + ) + ) + elif media_model[ff] == "gif": + media_lists.append( + InputMediaAnimation( + media_list[ff], + caption=text if ff == 0 else None, + parse_mode=ParseMode.HTML, + ) + ) else: media_lists.append( InputMediaVideo( media_list[ff], caption=text if ff == 0 else None, - parse_mode=ParseMode.HTML - )) + parse_mode=ParseMode.HTML, + ) + ) return media_lists def get_twitter_time(date: str) -> str: try: - date = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") + timedelta(hours=8) + date = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y") + timedelta( + hours=8 + ) return date.strftime("%Y-%m-%d %H:%M:%S") except Exception: return date @@ -76,102 +139,116 @@ def get_twitter_time(date: str) -> str: def get_twitter_user(url_json): user_name = url_json.name user_username = url_json.screen_name - status = '' + status = "" status_link = None - verified = '💎' if url_json.verified else '' - protected = '🔒' if url_json.protected else '' + verified = "💎" if url_json.verified else "" + protected = "🔒" if url_json.protected else "" if url_json.status: status_link = f"https://twitter.com/{user_username}/{url_json.status.id_str}" status = f'🆕 New Status: {get_twitter_time(url_json.status.created_at)}\n' - text = f'Twitter User Info\n\n' \ - f'Name: {verified}{protected}{user_name}\n' \ - f'Username: @{user_username}\n' \ - f'Bio: {url_json.description}\n' \ - f'Joined: {get_twitter_time(url_json.created_at)}\n' \ - f'{status}' \ - f'📤 {url_json.statuses_count} ❤️{url_json.favourites_count} ' \ - f'粉丝 {url_json.followers_count} 关注 {url_json.friends_count}' + text = ( + f"Twitter User Info\n\n" + f"Name: {verified}{protected}{user_name}\n" + f'Username: @{user_username}\n' + f"Bio: {url_json.description}\n" + f"Joined: {get_twitter_time(url_json.created_at)}\n" + f"{status}" + f"📤 {url_json.statuses_count} ❤️{url_json.favourites_count} " + f"粉丝 {url_json.followers_count} 关注 {url_json.friends_count}" + ) return text, user_username, status_link def get_twitter_status(url_json): created_at = get_twitter_time(url_json.created_at) - favorite_count = url_json.favorite_count if hasattr(url_json, 'favorite_count') else 0 - retweet_count = url_json.retweet_count if hasattr(url_json, 'retweet_count') else 0 + favorite_count = ( + url_json.favorite_count if hasattr(url_json, "favorite_count") else 0 + ) + retweet_count = url_json.retweet_count if hasattr(url_json, "retweet_count") else 0 user_name = url_json.user.name user_username = url_json.user.screen_name - text = url_json.full_text if hasattr(url_json, 'full_text') else '暂 无 内 容' - text = f'{text}' - verified = '' - protected = '' + text = url_json.full_text if hasattr(url_json, "full_text") else "暂 无 内 容" + text = f"{text}" + verified = "" + protected = "" if url_json.user.verified: - verified = '💎' + verified = "💎" if url_json.user.protected: - protected = '🔒' - user_text = f'{verified}{protected}{user_name} 发表于 {created_at}' \ - f'\n👍 {favorite_count} 🔁 {retweet_count}' + protected = "🔒" + user_text = ( + f'{verified}{protected}{user_name} 发表于 {created_at}' + f"\n👍 {favorite_count} 🔁 {retweet_count}" + ) media_model = [] media_list = [] media_alt_list = [] with contextlib.suppress(Exception): media_info = url_json.media for i in media_info: - media_url = i.url if hasattr(i, 'url') else None + media_url = i.url if hasattr(i, "url") else None if media_url: - text = text.replace(media_url, '') - if i.type == 'photo': - media_model.append('photo') + text = text.replace(media_url, "") + if i.type == "photo": + media_model.append("photo") media_list.append(i.media_url_https) - elif i.type == 'animated_gif': - media_model.append('gif') - media_list.append(i.video_info['variants'][0]['url']) + elif i.type == "animated_gif": + media_model.append("gif") + media_list.append(i.video_info["variants"][0]["url"]) else: - media_model.append('video') - for f in i.video_info['variants']: - if f['content_type'] == 'video/mp4': - media_list.append(f['url']) + media_model.append("video") + for f in i.video_info["variants"]: + if f["content_type"] == "video/mp4": + media_list.append(f["url"]) break try: media_alt_list.append(i.ext_alt_text) except: - media_alt_list.append('') + media_alt_list.append("") quoted_status = False with contextlib.suppress(Exception): quoted = url_json.quoted_status - quoted_status = quoted.user.screen_name + '/status/' + url_json.quoted_status_id_str + quoted_status = ( + quoted.user.screen_name + "/status/" + url_json.quoted_status_id_str + ) quoted_created_at = get_twitter_time(quoted.created_at) - quoted_favorite_count = quoted.favorite_count if hasattr(quoted, 'favorite_count') else 0 - quoted_retweet_count = quoted.retweet_count if hasattr(quoted, 'retweet_count') else 0 + quoted_favorite_count = ( + quoted.favorite_count if hasattr(quoted, "favorite_count") else 0 + ) + quoted_retweet_count = ( + quoted.retweet_count if hasattr(quoted, "retweet_count") else 0 + ) quoted_user_name = quoted.user.name quoted_user_username = quoted.user.screen_name - quoted_text = quoted.full_text if hasattr(quoted, 'full_text') else '暂 无 内 容' - text += f'\n\n> {quoted_text}' - quoted_verified = '' - quoted_protected = '' + quoted_text = quoted.full_text if hasattr(quoted, "full_text") else "暂 无 内 容" + text += f"\n\n> {quoted_text}" + quoted_verified = "" + quoted_protected = "" if quoted.user.verified: - quoted_verified = '💎' + quoted_verified = "💎" if quoted.user.protected: - quoted_protected = '🔒' - user_text += f'\n> {quoted_verified}{quoted_protected}' \ - f'{quoted_user_name} 发表于 {quoted_created_at}' \ - f'\n👍 {quoted_favorite_count} 🔁 {quoted_retweet_count}' + quoted_protected = "🔒" + user_text += ( + f'\n> {quoted_verified}{quoted_protected}' + f"{quoted_user_name} 发表于 {quoted_created_at}" + f"\n👍 {quoted_favorite_count} 🔁 {quoted_retweet_count}" + ) with contextlib.suppress(Exception): quoted_media_info = quoted.media for i in quoted_media_info: - media_url = i.url if hasattr(i, 'url') else None + media_url = i.url if hasattr(i, "url") else None if media_url: - text = text.replace(media_url, '') - if i.type == 'photo': - media_model.append('photo') + text = text.replace(media_url, "") + if i.type == "photo": + media_model.append("photo") media_list.append(i.media_url_https) - elif i.type == 'animated_gif': - media_model.append('gif') - media_list.append(i.video_info['variants'][0]['url']) + elif i.type == "animated_gif": + media_model.append("gif") + media_list.append(i.video_info["variants"][0]["url"]) else: - media_model.append('video') - media_list.append(i.video_info['variants'][0]['url']) + media_model.append("video") + media_list.append(i.video_info["variants"][0]["url"]) try: media_alt_list.append(i.ext_alt_text) except: - media_alt_list.append('') + media_alt_list.append("") return text, user_text, media_model, media_list, quoted_status diff --git a/headers.py b/headers.py index 746f22e..dca230e 100644 --- a/headers.py +++ b/headers.py @@ -1 +1,3 @@ -headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36'} +headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36" +} diff --git a/init.py b/init.py index a6623a3..1273a37 100644 --- a/init.py +++ b/init.py @@ -37,7 +37,9 @@ class UserMe: user_me = UserMe() sqlite = Sqlite() -bot = pyrogram.Client("bot", api_id=api_id, api_hash=api_hash, ipv6=ipv6, plugins=dict(root="modules")) +bot = pyrogram.Client( + "bot", api_id=api_id, api_hash=api_hash, ipv6=ipv6, plugins=dict(root="modules") +) # temp fix topics group setattr(pyrogram.types.Message, "old_parse", getattr(pyrogram.types.Message, "_parse")) setattr(pyrogram.types.Message, "_parse", temp_fix) diff --git a/models/fragment.py b/models/fragment.py index 9c6a09c..9f133a1 100644 --- a/models/fragment.py +++ b/models/fragment.py @@ -74,20 +74,19 @@ class UserName(BaseModel): @property def text(self) -> str: - text = f"用户名:@{self.name}\n" \ - f"状态:{self.status.text}\n" + text = f"用户名:@{self.name}\n" f"状态:{self.status.text}\n" if self.status == AuctionStatus.Available: text += f"最低出价:{self.now_price.text}\n" elif self.status == AuctionStatus.OnAuction: - text += f"目前最高出价:{self.now_price.text}\n" \ - f"距离拍卖结束:{self.end_human_time}\n" + text += f"目前最高出价:{self.now_price.text}\n" f"距离拍卖结束:{self.end_human_time}\n" elif self.status == AuctionStatus.Sold: - text += f"售出价格:{self.now_price.text}\n" \ - f"最终买家:{self.purchaser[:12]}...\n" \ - f"售出时间:{self.strf_end_time}\n" + text += ( + f"售出价格:{self.now_price.text}\n" + f"最终买家:{self.purchaser[:12]}...\n" + f"售出时间:{self.strf_end_time}\n" + ) elif self.status == AuctionStatus.Sale: - text += f"售价:{self.now_price.text}\n" \ - f"距离出售结束:{self.end_human_time}\n" + text += f"售价:{self.now_price.text}\n" f"距离出售结束:{self.end_human_time}\n" return text @@ -117,7 +116,11 @@ class FragmentSub: async def get_by_cid_and_username(cid: int, username: str) -> Optional[Fragment]: async with sqlite.Session() as session: session = cast(AsyncSession, session) - statement = select(Fragment).where(Fragment.cid == cid).where(Fragment.username == username) + statement = ( + select(Fragment) + .where(Fragment.cid == cid) + .where(Fragment.username == username) + ) results = await session.exec(statement) return post[0] if (post := results.first()) else None diff --git a/models/models/fragment.py b/models/models/fragment.py index 311586b..ee933d2 100644 --- a/models/models/fragment.py +++ b/models/models/fragment.py @@ -2,7 +2,7 @@ from sqlmodel import SQLModel, Field class Fragment(SQLModel, table=True): - __table_args__ = dict(mysql_charset='utf8mb4', mysql_collate="utf8mb4_general_ci") + __table_args__ = dict(mysql_charset="utf8mb4", mysql_collate="utf8mb4_general_ci") cid: int = Field(primary_key=True) username: str = Field(primary_key=True) diff --git a/models/models/lofter.py b/models/models/lofter.py index 9de3ec7..c771054 100644 --- a/models/models/lofter.py +++ b/models/models/lofter.py @@ -2,7 +2,7 @@ from sqlmodel import SQLModel, Field class Lofter(SQLModel, table=True): - __table_args__ = dict(mysql_charset='utf8mb4', mysql_collate="utf8mb4_general_ci") + __table_args__ = dict(mysql_charset="utf8mb4", mysql_collate="utf8mb4_general_ci") user_id: str = Field(primary_key=True) username: str = Field() diff --git a/models/temp_fix.py b/models/temp_fix.py index c4a00d5..3ba579e 100644 --- a/models/temp_fix.py +++ b/models/temp_fix.py @@ -7,12 +7,18 @@ async def temp_fix( users: dict, chats: dict, is_scheduled: bool = False, - replies: int = 1 + replies: int = 1, ): - parsed = await pyrogram.types.Message.old_parse(client, message, users, chats, is_scheduled, replies) # noqa - if isinstance(message, pyrogram.raw.types.Message) and message.reply_to \ - and hasattr(message.reply_to, "forum_topic") and message.reply_to.forum_topic \ - and not message.reply_to.reply_to_top_id: + parsed = await pyrogram.types.Message.old_parse( + client, message, users, chats, is_scheduled, replies + ) # noqa + if ( + isinstance(message, pyrogram.raw.types.Message) + and message.reply_to + and hasattr(message.reply_to, "forum_topic") + and message.reply_to.forum_topic + and not message.reply_to.reply_to_top_id + ): parsed.reply_to_top_message_id = parsed.reply_to_message_id parsed.reply_to_message_id = None parsed.reply_to_message = None diff --git a/modules/anti_channel.py b/modules/anti_channel.py index b7dba84..4b9c2f3 100644 --- a/modules/anti_channel.py +++ b/modules/anti_channel.py @@ -18,8 +18,9 @@ async def anti_channel_msg(client: Client, update: Update, _, __: dict): while True: try: # Check for message that are from channel - if (not isinstance(update, types.UpdateNewChannelMessage) or - not isinstance(update.message.from_id, types.PeerChannel)): + if not isinstance(update, types.UpdateNewChannelMessage) or not isinstance( + update.message.from_id, types.PeerChannel + ): raise ContinuePropagation # Basic data @@ -33,9 +34,12 @@ async def anti_channel_msg(client: Client, update: Update, _, __: dict): channel_id = int(f"-100{message.from_id.channel_id}") # Check for linked channel - if ((message.fwd_from and - message.fwd_from.saved_from_peer == message.fwd_from.from_id == message.from_id) or - channel_id == chat_id): + if ( + message.fwd_from + and message.fwd_from.saved_from_peer + == message.fwd_from.from_id + == message.from_id + ) or channel_id == chat_id: raise ContinuePropagation # Check if blocked @@ -56,7 +60,7 @@ async def anti_channel_msg(client: Client, update: Update, _, __: dict): send_gifs=True, send_games=True, send_polls=True, - ) + ), ) ) await client.delete_messages(chat_id, message.id) @@ -73,8 +77,11 @@ async def anti_channel_msg(client: Client, update: Update, _, __: dict): raise ContinuePropagation from e -@Client.on_message(filters.incoming & filters.group & - filters.command(["anti_channel_msg", f"anti_channel_msg@{user_me.username}"])) +@Client.on_message( + filters.incoming + & filters.group + & filters.command(["anti_channel_msg", f"anti_channel_msg@{user_me.username}"]) +) async def switch_anti_channel_msg(client: Client, message: Message): # Check user if message.from_user: @@ -93,14 +100,18 @@ async def switch_anti_channel_msg(client: Client, message: Message): clean(message.chat.id) await message.reply("Anti-channel is now disabled.") else: - await message.reply("Anti-channel is already enabled.\n" - "\n" - "Tips: Use `/anti_channel_msg true` to enable or disable it.") + await message.reply( + "Anti-channel is already enabled.\n" + "\n" + "Tips: Use `/anti_channel_msg true` to enable or disable it." + ) elif switch: add(message.chat.id, message.chat.id) await message.reply("Anti-channel is now enabled.") else: - await message.reply("Anti-channel is already disabled.\n" - "\n" - "Tips: Use `/anti_channel_msg true` to enable or disable it.") + await message.reply( + "Anti-channel is already disabled.\n" + "\n" + "Tips: Use `/anti_channel_msg true` to enable or disable it." + ) raise ContinuePropagation diff --git a/modules/ask.py b/modules/ask.py index 4b58faa..4fcf204 100644 --- a/modules/ask.py +++ b/modules/ask.py @@ -6,8 +6,7 @@ from pyrogram.types import Message from defs.ask import how_many, what_time, how_long, hif, handle_pers, who -@Client.on_message(filters.incoming & - filters.regex(r"^问")) +@Client.on_message(filters.incoming & filters.regex(r"^问")) async def ask(_: Client, message: Message): msg = message if not message.text: diff --git a/modules/banme.py b/modules/banme.py index 7839d03..de21f97 100644 --- a/modules/banme.py +++ b/modules/banme.py @@ -8,35 +8,46 @@ from pyrogram.types import Message, ChatPermissions from init import user_me -@Client.on_message(filters.incoming & filters.group & - filters.command(["banme", f"banme@{user_me.username}"])) +@Client.on_message( + filters.incoming + & filters.group + & filters.command(["banme", f"banme@{user_me.username}"]) +) async def ban_me_command(client: Client, message: Message): args = str(message.text).strip() # 检查是否有倍数参数 - if multiple_text := re.search(r'^(\d+)倍$', args): + if multiple_text := re.search(r"^(\d+)倍$", args): multiple = int(multiple_text.groups()[0]) else: multiple = 1 # 检查bot和用户身份 - if (await client.get_chat_member(message.chat.id, "self")).status != ChatMemberStatus.ADMINISTRATOR: - await message.reply('Bot非群管理员, 无法执行禁言操作QAQ') + if ( + await client.get_chat_member(message.chat.id, "self") + ).status != ChatMemberStatus.ADMINISTRATOR: + await message.reply("Bot非群管理员, 无法执行禁言操作QAQ") return if not message.from_user: # 频道 - await message.reply('你是个频道, 别来凑热闹OvO') + await message.reply("你是个频道, 别来凑热闹OvO") return - member = (await client.get_chat_member(message.chat.id, message.from_user.id)).status + member = ( + await client.get_chat_member(message.chat.id, message.from_user.id) + ).status if member in [ChatMemberStatus.ADMINISTRATOR, ChatMemberStatus.OWNER]: - await message.reply('你也是个管理, 别来凑热闹OvO') + await message.reply("你也是个管理, 别来凑热闹OvO") return # 随机禁言时间 random_time = 2 * int(random.gauss(128 * multiple, 640 * multiple // 10)) act_time = 60 if random_time < 60 else min(random_time, 2591940) - msg = f'既然你那么想被口球的话, 那我就成全你吧!\n送你一份{act_time // 60}分{act_time % 60}秒禁言套餐哦, 谢谢惠顾~' + msg = f"既然你那么想被口球的话, 那我就成全你吧!\n送你一份{act_time // 60}分{act_time % 60}秒禁言套餐哦, 谢谢惠顾~" - await client.restrict_chat_member(message.chat.id, message.from_user.id, ChatPermissions(), - datetime.now() + timedelta(seconds=act_time)) + await client.restrict_chat_member( + message.chat.id, + message.from_user.id, + ChatPermissions(), + datetime.now() + timedelta(seconds=act_time), + ) await message.reply(msg) diff --git a/modules/bilibili.py b/modules/bilibili.py index cc354ee..86dc84c 100644 --- a/modules/bilibili.py +++ b/modules/bilibili.py @@ -4,15 +4,23 @@ from io import BytesIO from pyrogram import Client, filters, ContinuePropagation from pyrogram.types import Message -from defs.bilibili import b23_extract, video_info_get, binfo_image_create, get_dynamic_screenshot_pc +from defs.bilibili import ( + b23_extract, + video_info_get, + binfo_image_create, + get_dynamic_screenshot_pc, +) from defs.button import gen_button, Button -@Client.on_message(filters.incoming & filters.text & - filters.regex(r"av(\d{1,12})|BV(1[A-Za-z0-9]{2}4.1.7[A-Za-z0-9]{2})|b23.tv")) +@Client.on_message( + filters.incoming + & filters.text + & filters.regex(r"av(\d{1,12})|BV(1[A-Za-z0-9]{2}4.1.7[A-Za-z0-9]{2})|b23.tv") +) async def bili_resolve(_: Client, message: Message): """ - 解析 bilibili 链接 + 解析 bilibili 链接 """ if "b23.tv" in message.text: message.text = await b23_extract(message.text) @@ -26,13 +34,16 @@ async def bili_resolve(_: Client, message: Message): await message.reply_photo( image, quote=True, - reply_markup=gen_button([Button(0, "Link", "https://b23.tv/" + video_info["data"]["bvid"])]) + reply_markup=gen_button( + [Button(0, "Link", "https://b23.tv/" + video_info["data"]["bvid"])] + ), ) raise ContinuePropagation -@Client.on_message(filters.incoming & filters.text & - filters.regex(r"t.bilibili.com/([0-9]*)")) +@Client.on_message( + filters.incoming & filters.text & filters.regex(r"t.bilibili.com/([0-9]*)") +) async def bili_dynamic(_: Client, message: Message): # sourcery skip: use-named-expression p = re.compile(r"t.bilibili.com/([0-9]*)") @@ -47,6 +58,8 @@ async def bili_dynamic(_: Client, message: Message): await message.reply_photo( photo, quote=True, - reply_markup=gen_button([Button(0, "Link", f"https://t.bilibili.com/{dynamic_id}")]) + reply_markup=gen_button( + [Button(0, "Link", f"https://t.bilibili.com/{dynamic_id}")] + ), ) raise ContinuePropagation diff --git a/modules/book_of_answers.py b/modules/book_of_answers.py index 98133a5..c05f295 100644 --- a/modules/book_of_answers.py +++ b/modules/book_of_answers.py @@ -10,15 +10,13 @@ from pyrogram.types import Message book_of_answers: List[str] -with open(f"resources{sep}text{sep}book_of_answers.json", "r", encoding="utf-8") as file: +with open( + f"resources{sep}text{sep}book_of_answers.json", "r", encoding="utf-8" +) as file: book_of_answers = json.load(file) -@Client.on_message(filters.incoming & - filters.regex(r"^答案之书$")) +@Client.on_message(filters.incoming & filters.regex(r"^答案之书$")) async def book_of_answer(_: Client, message: Message): - await message.reply_text( - f"{choice(book_of_answers)}", - quote=True - ) + await message.reply_text(f"{choice(book_of_answers)}", quote=True) raise ContinuePropagation diff --git a/modules/dc.py b/modules/dc.py index 1210bc0..97869f4 100644 --- a/modules/dc.py +++ b/modules/dc.py @@ -5,7 +5,11 @@ from init import user_me def mention_chat(chat: Chat) -> str: - return f'{chat.title}' if chat.username else chat.title + return ( + f'{chat.title}' + if chat.username + else chat.title + ) def get_dc(message: Message): @@ -27,15 +31,18 @@ def get_dc(message: Message): return dc, mention -@Client.on_message(filters.incoming & - filters.command(["dc", f"dc@{user_me.username}"])) +@Client.on_message(filters.incoming & filters.command(["dc", f"dc@{user_me.username}"])) async def dc_command(_: Client, message: Message): - geo_dic = {'1': '美国-佛罗里达州-迈阿密', '2': '荷兰-阿姆斯特丹', '3': '美国-佛罗里达州-迈阿密', - '4': '荷兰-阿姆斯特丹', '5': '新加坡'} + geo_dic = { + "1": "美国-佛罗里达州-迈阿密", + "2": "荷兰-阿姆斯特丹", + "3": "美国-佛罗里达州-迈阿密", + "4": "荷兰-阿姆斯特丹", + "5": "新加坡", + } dc, mention = get_dc(message) if dc: - text = f"{mention}所在数据中心为: DC{dc}\n" \ - f"该数据中心位于 {geo_dic[str(dc)]}" + text = f"{mention}所在数据中心为: DC{dc}\n" f"该数据中心位于 {geo_dic[str(dc)]}" else: text = f"{mention}需要先设置头像并且对我可见。" await message.reply(text) diff --git a/modules/exchange.py b/modules/exchange.py index 5517b0d..6cac6aa 100644 --- a/modules/exchange.py +++ b/modules/exchange.py @@ -12,29 +12,34 @@ async def exchange_refresh() -> None: await exchange_client.refresh() -@Client.on_message(filters.incoming & - filters.command(["exchange", f"exchange@{user_me.username}"])) +@Client.on_message( + filters.incoming & filters.command(["exchange", f"exchange@{user_me.username}"]) +) async def exchange_command(_: Client, message: Message): if not exchange_client.inited: await exchange_client.refresh() if not exchange_client.inited: return await message.reply("获取汇率数据出现错误!") text = await exchange_client.check_ex(message) - if text == 'help': - text = '该指令可用于查询汇率。\n使用方式举例:\n/exchange USD CNY - 1 USD 等于多少 CNY\n' \ - '/exchange 11 CNY USD - 11 CNY 等于多少 USD' - elif text == 'ValueError': - text = '金额不合法' - elif text == 'ValueBig': - text = '我寻思你也没这么多钱啊。' - elif text == 'ValueSmall': - text = '小数点后的数字咋这么多?' - elif text == 'FromError': - text = '不支持的起源币种。' - elif text == 'ToError': - text = '不支持的目标币种。' + if text == "help": + text = ( + "该指令可用于查询汇率。\n使用方式举例:\n/exchange USD CNY - 1 USD 等于多少 CNY\n" + "/exchange 11 CNY USD - 11 CNY 等于多少 USD" + ) + elif text == "ValueError": + text = "金额不合法" + elif text == "ValueBig": + text = "我寻思你也没这么多钱啊。" + elif text == "ValueSmall": + text = "小数点后的数字咋这么多?" + elif text == "FromError": + text = "不支持的起源币种。" + elif text == "ToError": + text = "不支持的目标币种。" else: return await message.reply(text) reply_ = await message.reply(text) if message.chat.type == ChatType.PRIVATE: - await reply_.reply('支持货币: ' + ', '.join(exchange_client.currencies) + '') + await reply_.reply( + "支持货币: " + ", ".join(exchange_client.currencies) + "" + ) diff --git a/modules/fragment.py b/modules/fragment.py index bd9eced..7e44eef 100644 --- a/modules/fragment.py +++ b/modules/fragment.py @@ -3,8 +3,14 @@ import re from pyrogram import Client, filters, ContinuePropagation from pyrogram.enums import ChatMemberStatus -from pyrogram.types import InlineQuery, InlineQueryResultArticle, InputTextMessageContent, InlineKeyboardMarkup, \ - InlineKeyboardButton, Message +from pyrogram.types import ( + InlineQuery, + InlineQueryResultArticle, + InputTextMessageContent, + InlineKeyboardMarkup, + InlineKeyboardButton, + Message, +) from models.fragment import FragmentSubText, FragmentSub, AuctionStatus from defs.fragment import parse_fragment, NotAvailable, parse_sub @@ -14,7 +20,9 @@ from scheduler import scheduler, add_delete_message_job QUERY_PATTERN = re.compile(r"^@\w[a-zA-Z0-9_]{3,32}$") -@Client.on_message(filters.incoming & filters.command(["username", f"username@{user_me.username}"])) +@Client.on_message( + filters.incoming & filters.command(["username", f"username@{user_me.username}"]) +) async def fragment_command(client: Client, message: Message): status = None user = None @@ -70,8 +78,7 @@ async def fragment_inline(_, inline_query: InlineQuery): user = await parse_fragment(username) text = user.text except NotAvailable: - text = f"用户名:@{username}\n" \ - f"状态:暂未开放购买\n" + text = f"用户名:@{username}\n" f"状态:暂未开放购买\n" except Exception: text = "" if not text: @@ -89,19 +96,20 @@ async def fragment_inline(_, inline_query: InlineQuery): description="点击发送详情", reply_markup=InlineKeyboardMarkup( [ - [InlineKeyboardButton( - "Open", - url=f"https://fragment.com/username/{username}" - )] + [ + InlineKeyboardButton( + "Open", url=f"https://fragment.com/username/{username}" + ) + ] ] - ) + ), ), ] await inline_query.answer( results=results, switch_pm_text="查询成功", switch_pm_parameter="start", - cache_time=0 + cache_time=0, ) diff --git a/modules/friend_say.py b/modules/friend_say.py index fb9204f..ade407d 100644 --- a/modules/friend_say.py +++ b/modules/friend_say.py @@ -8,8 +8,7 @@ from pyrogram.types import Message from defs.friend_say import ImageUtil -@Client.on_message(filters.incoming & filters.group & - filters.regex(r"^我有个朋友")) +@Client.on_message(filters.incoming & filters.group & filters.regex(r"^我有个朋友")) async def friend_say(client: Client, message: Message): if not message.reply_to_message: raise ContinuePropagation @@ -20,13 +19,14 @@ async def friend_say(client: Client, message: Message): # Get Gravatar avatar = None if message.reply_to_message.from_user.photo: - avatar = await client.download_media(message.reply_to_message.from_user.photo.big_file_id, - file_name="avatar.jpg") + avatar = await client.download_media( + message.reply_to_message.from_user.photo.big_file_id, file_name="avatar.jpg" + ) # Get Name user_name = message.reply_to_message.from_user.first_name # Create image if avatar: - with open(avatar, 'rb') as fh: + with open(avatar, "rb") as fh: buf = BytesIO(fh.read()) ava = ImageUtil(100, 100, background=buf) else: diff --git a/modules/geo.py b/modules/geo.py index 62dc064..c597766 100644 --- a/modules/geo.py +++ b/modules/geo.py @@ -12,11 +12,12 @@ from init import user_me, request REQUEST_URL = f"https://restapi.amap.com/v3/geocode/geo?key={amap_key}&" -@Client.on_message(filters.incoming & - filters.command(["geo", f"geo@{user_me.username}"])) +@Client.on_message( + filters.incoming & filters.command(["geo", f"geo@{user_me.username}"]) +) async def geo_command(_: Client, message: Message): if len(message.command) <= 1: - await message.reply('没有找到要查询的中国 经纬度/地址 ...') + await message.reply("没有找到要查询的中国 经纬度/地址 ...") return mode, lat, lon = "address", 0, 0 # noqa with contextlib.suppress(ValueError, IndexError): @@ -24,7 +25,11 @@ async def geo_command(_: Client, message: Message): mode = "location" if mode == "location": try: - geo = (await request.get(f"{REQUEST_URL.replace('/geo?', '/regeo?')}location={lat},{lon}")).json() + geo = ( + await request.get( + f"{REQUEST_URL.replace('/geo?', '/regeo?')}location={lat},{lon}" + ) + ).json() formatted_address = geo["regeocode"]["formatted_address"] assert isinstance(formatted_address, str) except (KeyError, AssertionError): @@ -32,12 +37,19 @@ async def geo_command(_: Client, message: Message): return else: try: - geo = (await request.get(f"{REQUEST_URL}address={quote(' '.join(message.command[1:]).strip())}")).json() + geo = ( + await request.get( + f"{REQUEST_URL}address={quote(' '.join(message.command[1:]).strip())}" + ) + ).json() formatted_address = geo["geocodes"][0]["formatted_address"] lat, lon = geo["geocodes"][0]["location"].split(",") except (KeyError, IndexError, ValueError): - await message.reply(f"无法解析地址 {' '.join(message.command[1:]).strip()}", quote=True) + await message.reply( + f"无法解析地址 {' '.join(message.command[1:]).strip()}", quote=True + ) return - msg = await message.reply_location(longitude=float(lat), latitude=float(lon), quote=True) - await msg.reply(f"坐标:`{lat},{lon}`\n" - f"地址:{formatted_address}", quote=True) + msg = await message.reply_location( + longitude=float(lat), latitude=float(lon), quote=True + ) + await msg.reply(f"坐标:`{lat},{lon}`\n" f"地址:{formatted_address}", quote=True) diff --git a/modules/guess.py b/modules/guess.py index f887f6e..de7fbab 100644 --- a/modules/guess.py +++ b/modules/guess.py @@ -5,23 +5,26 @@ from defs.guess import guess_str from init import user_me -@Client.on_message(filters.incoming & - filters.command(["guess", f"guess@{user_me.username}"])) +@Client.on_message( + filters.incoming & filters.command(["guess", f"guess@{user_me.username}"]) +) async def guess_command(_: Client, message: Message): - msg = await message.reply('正在查询中...') - if str(message.text.split()) == 1: - text = await guess_str(message.reply_to_message.text) - if text == '': - text = '没有匹配到拼音首字母缩写' + msg = await message.reply("正在查询中...") + if len(message.text.split()) == 1: + text = "" + if reply := message.reply_to_message: + text = await guess_str(reply.text) + if text == "": + text = "没有匹配到拼音首字母缩写" await msg.edit(text) else: - rep_text = '' + rep_text = "" if reply := message.reply_to_message: rep_text += await guess_str(reply.text) text = await guess_str(message.text[7:]) if not rep_text and not text: - await msg.edit('没有匹配到拼音首字母缩写') + await msg.edit("没有匹配到拼音首字母缩写") elif not rep_text: - await msg.edit(f'{text}') + await msg.edit(f"{text}") else: - await msg.edit(f'{rep_text}{text}') + await msg.edit(f"{rep_text}{text}") diff --git a/modules/ip.py b/modules/ip.py index 35360c8..0012e13 100644 --- a/modules/ip.py +++ b/modules/ip.py @@ -7,64 +7,82 @@ from defs.ip import ip_info from init import user_me, request -@Client.on_message(filters.incoming & - filters.command(["ip", f"ip@{user_me.username}"])) +@Client.on_message(filters.incoming & filters.command(["ip", f"ip@{user_me.username}"])) async def ip_command(_: Client, message: Message): - msg = await message.reply('正在查询中...') - rep_text = '' + msg = await message.reply("正在查询中...") + rep_text = "" reply = message.reply_to_message if reply: if reply.entities: for num in range(0, len(reply.entities)): url = reply.text[ - reply.entities[num].offset:reply.entities[num].offset + reply.entities[num].length] + reply.entities[num].offset : reply.entities[num].offset + + reply.entities[num].length + ] url = urlparse(url) if url.hostname or url.path: if url.hostname: url = url.hostname else: url = url.path - ipinfo_json = (await request.get( - "http://ip-api.com/json/" + url + "?fields=status,message,country,regionName,city," - "lat,lon,isp," - "org,as,mobile,proxy,hosting,query")).json() - if ipinfo_json['status'] == 'fail': + ipinfo_json = ( + await request.get( + "http://ip-api.com/json/" + + url + + "?fields=status,message,country,regionName,city," + "lat,lon,isp," + "org,as,mobile,proxy,hosting,query" + ) + ).json() + if ipinfo_json["status"] == "fail": pass - elif ipinfo_json['status'] == 'success': + elif ipinfo_json["status"] == "success": rep_text = ip_info(url, ipinfo_json) - text = '' + text = "" if message.entities: for num in range(0, len(message.entities)): url = message.text[ - message.entities[num].offset:message.entities[num].offset + message.entities[num].length] + message.entities[num].offset : message.entities[num].offset + + message.entities[num].length + ] url = urlparse(url) if url.hostname or url.path: if url.hostname: url = url.hostname else: url = url.path - ipinfo_json = (await request.get( - "http://ip-api.com/json/" + url + "?fields=status,message,country,regionName,city,lat," - "lon,isp," - "org,as,mobile,proxy,hosting,query")).json() - if ipinfo_json['status'] == 'fail': + ipinfo_json = ( + await request.get( + "http://ip-api.com/json/" + + url + + "?fields=status,message,country,regionName,city,lat," + "lon,isp," + "org,as,mobile,proxy,hosting,query" + ) + ).json() + if ipinfo_json["status"] == "fail": pass - elif ipinfo_json['status'] == 'success': + elif ipinfo_json["status"] == "success": text = ip_info(url, ipinfo_json) - if text == '': + if text == "": url = message.text[4:] - if not url == '': - ipinfo_json = (await request.get( - "http://ip-api.com/json/" + url + "?fields=status,message,country,regionName,city,lat," - "lon,isp," - "org,as,mobile,proxy,hosting,query")).json() - if ipinfo_json['status'] == 'fail': + if not url == "": + ipinfo_json = ( + await request.get( + "http://ip-api.com/json/" + + url + + "?fields=status,message,country,regionName,city,lat," + "lon,isp," + "org,as,mobile,proxy,hosting,query" + ) + ).json() + if ipinfo_json["status"] == "fail": pass - elif ipinfo_json['status'] == 'success': + elif ipinfo_json["status"] == "success": text = ip_info(url, ipinfo_json) - if rep_text == '' and text == '': - await msg.edit('没有找到要查询的 ip/域名 ...') - elif not rep_text == '' and not text == '': - await msg.edit(f'{rep_text}\n================\n{text}') + if rep_text == "" and text == "": + await msg.edit("没有找到要查询的 ip/域名 ...") + elif not rep_text == "" and not text == "": + await msg.edit(f"{rep_text}\n================\n{text}") else: - await msg.edit(f'{rep_text}{text}') + await msg.edit(f"{rep_text}{text}") diff --git a/modules/lofter.py b/modules/lofter.py index d580545..2177e8d 100644 --- a/modules/lofter.py +++ b/modules/lofter.py @@ -5,8 +5,7 @@ from pyrogram.types import Message from defs.lofter import get_loft, input_media, get_loft_user, lofter_user_link -@Client.on_message(filters.incoming & filters.text & - filters.regex(r"lofter.com")) +@Client.on_message(filters.incoming & filters.text & filters.regex(r"lofter.com")) async def lofter_share(_: Client, message: Message): if not message.text: return @@ -15,7 +14,7 @@ async def lofter_share(_: Client, message: Message): for num in range(len(message.entities)): entity = message.entities[num] if entity.type == MessageEntityType.URL: - url = message.text[entity.offset:entity.offset + entity.length] + url = message.text[entity.offset : entity.offset + entity.length] elif entity.type == MessageEntityType.TEXT_LINK: url = entity.url else: @@ -27,7 +26,9 @@ async def lofter_share(_: Client, message: Message): if len(img) == 1: await img[0].reply_to(message, static=static) else: - await message.reply_media_group(media=await input_media(img[:9], static), quote=True) + await message.reply_media_group( + media=await input_media(img[:9], static), quote=True + ) elif "front/blog" not in url: text, avatar, username, status_link = await get_loft_user(url) if avatar: @@ -35,14 +36,14 @@ async def lofter_share(_: Client, message: Message): avatar, caption=text, quote=True, - reply_markup=lofter_user_link(username, status_link) + reply_markup=lofter_user_link(username, status_link), ) else: await message.reply_text( text, quote=True, disable_web_page_preview=True, - reply_markup=lofter_user_link(username, status_link) + reply_markup=lofter_user_link(username, status_link), ) except Exception as e: print(e) diff --git a/modules/luxun.py b/modules/luxun.py index 8d80b06..9eb15dd 100644 --- a/modules/luxun.py +++ b/modules/luxun.py @@ -4,8 +4,7 @@ from pyrogram.types import Message from defs.luxun import process_pic -@Client.on_message(filters.incoming & - filters.regex(r"^鲁迅说过")) +@Client.on_message(filters.incoming & filters.regex(r"^鲁迅说过")) async def luxun_say(_: Client, message: Message): args = message.text[4:] if not args: diff --git a/modules/mihoyo_bbs.py b/modules/mihoyo_bbs.py index 028d623..15eee2e 100644 --- a/modules/mihoyo_bbs.py +++ b/modules/mihoyo_bbs.py @@ -10,17 +10,20 @@ from defs.mihoyo_bbs import get_mihoyo_screenshot from defs.button import gen_button, Button -@Client.on_message(filters.incoming & filters.text & - filters.regex(r'(https://)?(m\.)?bbs.mihoyo.com/.+/article/\d+')) +@Client.on_message( + filters.incoming + & filters.text + & filters.regex(r"(https://)?(m\.)?www.miyoushe.com/.+/article/\d+") +) async def bili_dynamic(_: Client, message: Message): # sourcery skip: use-named-expression try: - p = re.compile(r'(https://)?(m\.)?bbs.mihoyo.com/.+/article/\d+') + p = re.compile(r"(https://)?(m\.)?www.miyoushe.com/.+/article/\d+") article = p.search(message.text) if article: article_url = article.group() - if not article_url.startswith(('https://', 'http://')): - article_url = f'https://{article_url}' + if not article_url.startswith(("https://", "http://")): + article_url = f"https://{article_url}" image = await get_mihoyo_screenshot(article_url) if image: # 将bytes结果转化为字节流 @@ -32,13 +35,13 @@ async def bili_dynamic(_: Client, message: Message): await message.reply_document( document=photo, quote=True, - reply_markup=gen_button([Button(0, "Link", article_url)]) + reply_markup=gen_button([Button(0, "Link", article_url)]), ) else: await message.reply_photo( photo, quote=True, - reply_markup=gen_button([Button(0, "Link", article_url)]) + reply_markup=gen_button([Button(0, "Link", article_url)]), ) except Exception as e: print(f"截取米哈游帖子时发生错误:{e}") diff --git a/modules/post.py b/modules/post.py index 0941dd6..ce883fb 100644 --- a/modules/post.py +++ b/modules/post.py @@ -5,11 +5,15 @@ from pyrogram.types import Message from defs.post import LofterPost -@Client.on_message(filters.incoming & filters.private & filters.user(admin) & - filters.command(["lofter_post"])) +@Client.on_message( + filters.incoming + & filters.private + & filters.user(admin) + & filters.command(["lofter_post"]) +) async def lofter_post_command(client: Client, message: Message): """ - 抓取 lofter 粮单 + 抓取 lofter 粮单 """ data = message.text.split(" ") offset = 0 diff --git a/modules/repeater.py b/modules/repeater.py index 189cfef..2669b87 100644 --- a/modules/repeater.py +++ b/modules/repeater.py @@ -24,7 +24,12 @@ async def repeater_handler(client: Client, message: Message): msg = t_msg = message.text if not msg: raise ContinuePropagation - if msg.startswith("/") or msg.startswith("!") or msg.startswith(",") or msg.startswith(","): + if ( + msg.startswith("/") + or msg.startswith("!") + or msg.startswith(",") + or msg.startswith(",") + ): raise ContinuePropagation if msg != last_msg[group_id] or msg == last_repeat_msg[group_id]: diff --git a/modules/start.py b/modules/start.py index 13f34b8..f9dfda6 100644 --- a/modules/start.py +++ b/modules/start.py @@ -16,14 +16,18 @@ des = """本机器人特性: """ -@Client.on_message(filters.incoming & filters.private & - filters.command(["start"])) +@Client.on_message(filters.incoming & filters.private & filters.command(["start"])) async def start_command(_: Client, message: Message): """ - 回应机器人信息 + 回应机器人信息 """ - await message.reply(des, - quote=True, - reply_markup=gen_button( - [Button(0, "Gitlab", "https://gitlab.com/Xtao-Labs/iShotaBot"), - Button(0, "Github", "https://github.com/Xtao-Labs/iShotaBot")])) + await message.reply( + des, + quote=True, + reply_markup=gen_button( + [ + Button(0, "Gitlab", "https://gitlab.com/Xtao-Labs/iShotaBot"), + Button(0, "Github", "https://github.com/Xtao-Labs/iShotaBot"), + ] + ), + ) diff --git a/modules/twitter_api.py b/modules/twitter_api.py index f6f5895..50220cb 100644 --- a/modules/twitter_api.py +++ b/modules/twitter_api.py @@ -6,12 +6,17 @@ from pyrogram import Client, filters, ContinuePropagation from pyrogram.enums import MessageEntityType, ParseMode from pyrogram.types import Message -from defs.twitter_api import twitter_api, get_twitter_status, twitter_link, twitter_media, twitter_user_link, \ - get_twitter_user +from defs.twitter_api import ( + twitter_api, + get_twitter_status, + twitter_link, + twitter_media, + twitter_user_link, + get_twitter_user, +) -@Client.on_message(filters.incoming & filters.text & - filters.regex(r"twitter.com/")) +@Client.on_message(filters.incoming & filters.text & filters.regex(r"twitter.com/")) async def twitter_share(client: Client, message: Message): if not message.text: return @@ -20,21 +25,27 @@ async def twitter_share(client: Client, message: Message): for num in range(len(message.entities)): entity = message.entities[num] if entity.type == MessageEntityType.URL: - url = message.text[entity.offset:entity.offset + entity.length] + url = message.text[entity.offset : entity.offset + entity.length] elif entity.type == MessageEntityType.TEXT_LINK: url = entity.url else: continue url = urlparse(url) if url.hostname and url.hostname in ["twitter.com", "vxtwitter.com"]: - if url.path.find('status') >= 0: - status_id = str(url.path[url.path.find('status') + 7:].split("/")[0]).split("?")[0] + if url.path.find("status") >= 0: + status_id = str( + url.path[url.path.find("status") + 7 :].split("/")[0] + ).split("?")[0] url_json = None with ThreadPoolExecutor() as executor: for _ in range(3): try: - future = client.loop.run_in_executor(executor, twitter_api.GetStatus, status_id) - url_json = await asyncio.wait_for(future, timeout=30, loop=client.loop) + future = client.loop.run_in_executor( + executor, twitter_api.GetStatus, status_id + ) + url_json = await asyncio.wait_for( + future, timeout=30, loop=client.loop + ) except Exception as e: print(e) return @@ -42,15 +53,24 @@ async def twitter_share(client: Client, message: Message): break if not url_json: return - text, user_text, media_model, media_list, quoted_status = get_twitter_status(url_json) - text = f'Twitter Status Info\n\n{text}\n\n{user_text}' + ( + text, + user_text, + media_model, + media_list, + quoted_status, + ) = get_twitter_status(url_json) + text = f"Twitter Status Info\n\n{text}\n\n{user_text}" if len(media_model) == 0: await client.send_message( - message.chat.id, text, + message.chat.id, + text, parse_mode=ParseMode.HTML, disable_web_page_preview=True, reply_to_message_id=message.id, - reply_markup=twitter_link(url_json.id, quoted_status, url_json.user.screen_name) + reply_markup=twitter_link( + url_json.id, quoted_status, url_json.user.screen_name + ), ) elif len(media_model) == 1: if static: @@ -59,23 +79,35 @@ async def twitter_share(client: Client, message: Message): caption=text, quote=True, parse_mode=ParseMode.HTML, - reply_markup=twitter_link(url_json.id, quoted_status, url_json.user.screen_name) + reply_markup=twitter_link( + url_json.id, + quoted_status, + url_json.user.screen_name, + ), ) - elif media_model[0] == 'photo': + elif media_model[0] == "photo": await message.reply_photo( media_list[0], caption=text, parse_mode=ParseMode.HTML, quote=True, - reply_markup=twitter_link(url_json.id, quoted_status, url_json.user.screen_name) + reply_markup=twitter_link( + url_json.id, + quoted_status, + url_json.user.screen_name, + ), ) - elif media_model[0] == 'gif': + elif media_model[0] == "gif": await message.reply_animation( media_list[0], caption=text, parse_mode=ParseMode.HTML, quote=True, - reply_markup=twitter_link(url_json.id, quoted_status, url_json.user.screen_name) + reply_markup=twitter_link( + url_json.id, + quoted_status, + url_json.user.screen_name, + ), ) else: await message.reply_video( @@ -83,22 +115,32 @@ async def twitter_share(client: Client, message: Message): caption=text, parse_mode=ParseMode.HTML, quote=True, - reply_markup=twitter_link(url_json.id, quoted_status, url_json.user.screen_name) + reply_markup=twitter_link( + url_json.id, + quoted_status, + url_json.user.screen_name, + ), ) else: - await client.send_media_group(message.chat.id, - media=twitter_media(text, media_model, media_list, static)) - elif url.path == '/': + await client.send_media_group( + message.chat.id, + media=twitter_media(text, media_model, media_list, static), + ) + elif url.path == "/": return else: # 解析用户 - uid = url.path.replace('/', '') + uid = url.path.replace("/", "") url_json = None with ThreadPoolExecutor() as executor: for _ in range(3): try: - future = client.loop.run_in_executor(executor, twitter_api.GetUser, None, uid) - url_json = await asyncio.wait_for(future, timeout=30, loop=client.loop) + future = client.loop.run_in_executor( + executor, twitter_api.GetUser, None, uid + ) + url_json = await asyncio.wait_for( + future, timeout=30, loop=client.loop + ) except Exception as e: print(e) return @@ -108,10 +150,10 @@ async def twitter_share(client: Client, message: Message): return text, user_username, status_link = get_twitter_user(url_json) await message.reply_photo( - url_json.profile_image_url_https.replace('_normal', ''), + url_json.profile_image_url_https.replace("_normal", ""), caption=text, quote=True, - reply_markup=twitter_user_link(user_username, status_link) + reply_markup=twitter_user_link(user_username, status_link), ) except Exception as e: print(e) diff --git a/requirements.txt b/requirements.txt index 22a75df..cc65f73 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -pyrogram==2.0.63 +pyrogram==2.0.96 tgcrypto==1.2.5 httpx pillow diff --git a/scheduler.py b/scheduler.py index 15901ca..6092264 100644 --- a/scheduler.py +++ b/scheduler.py @@ -17,9 +17,12 @@ async def delete_message(message: Message) -> bool: def add_delete_message_job(message: Message, delete_seconds: int = 60): scheduler.add_job( - delete_message, "date", + delete_message, + "date", id=f"{message.chat.id}|{message.id}|delete_message", name=f"{message.chat.id}|{message.id}|delete_message", args=[message], - run_date=datetime.datetime.now(pytz.timezone("Asia/Shanghai")) + datetime.timedelta(seconds=delete_seconds), - replace_existing=True) + run_date=datetime.datetime.now(pytz.timezone("Asia/Shanghai")) + + datetime.timedelta(seconds=delete_seconds), + replace_existing=True, + )