From 7762986d6326dc2ff38caf783955ef73c5d4a9fb Mon Sep 17 00:00:00 2001 From: iwumingz Date: Mon, 11 Apr 2022 14:08:50 +0800 Subject: [PATCH] Add ipcheck plugin --- data/command.yml | 5 +++ plugins/google.py | 2 +- plugins/ping.py | 29 ++++++++++++ tools/googles.py | 25 ----------- tools/poster.py | 110 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 145 insertions(+), 26 deletions(-) delete mode 100644 tools/googles.py create mode 100644 tools/poster.py diff --git a/data/command.yml b/data/command.yml index 2537152..c23200d 100644 --- a/data/command.yml +++ b/data/command.yml @@ -161,3 +161,8 @@ ip: cmd: ip format: -ip usage: 查询IP地址或域名的信息 + +ipcheck: + cmd: ipcheck + format: -ipcheck <端口|无> + usage: 无端口参数时,查询IP或域名是否被阻断;有则查询端口是否开启 diff --git a/plugins/google.py b/plugins/google.py index 860547a..ec9a6a5 100644 --- a/plugins/google.py +++ b/plugins/google.py @@ -6,7 +6,7 @@ from loguru import logger from pyrogram import Client from pyrogram.errors import FloodWait from pyrogram.types import Message -from tools.googles import google_search +from tools.poster import google_search from tools.helpers import Parameters, show_cmd_tip, show_exception diff --git a/plugins/ping.py b/plugins/ping.py index 0cd7f6f..c2655a6 100644 --- a/plugins/ping.py +++ b/plugins/ping.py @@ -1,7 +1,9 @@ +from loguru import logger from core import command from pyrogram import Client from pyrogram.types import Message from tools.helpers import Parameters, show_cmd_tip, show_exception +from tools.poster import check_ip, check_ip_port, process_check_data from tools.sessions import session @@ -29,3 +31,30 @@ async def ip(_: Client, msg: Message): return await show_exception(msg, e) else: await msg.edit_text(text) + + +@Client.on_message(command("ipcheck")) +async def ip_checker(_: Client, msg: Message): + """检测IP或者域名是否被阻断""" + cmd, args = Parameters.get_more(msg) + if len(args) == 1: + try: + resp = await check_ip(args[0]) + except Exception as e: + logger.error(e) + return await show_exception(msg, e) + elif len(args) == 2: + try: + resp = await check_ip_port(args[0], args[1]) + except Exception as e: + logger.error(e) + return await show_exception(msg, e) + else: + return await show_cmd_tip(msg, cmd) + + try: + res = await process_check_data(len(args), resp=resp) + await msg.edit_text(f"🔎 Query `{' '.join(args)}`\n{res}") + except Exception as e: + logger.error(e) + await show_exception(msg, e) diff --git a/tools/googles.py b/tools/googles.py deleted file mode 100644 index 8899a4f..0000000 --- a/tools/googles.py +++ /dev/null @@ -1,25 +0,0 @@ -from typing import Dict -from urllib import parse - -from bs4 import BeautifulSoup -from loguru import logger - -from .sessions import session - - -async def google_search(content: str) -> Dict[str, str]: - result: Dict[str, str] = {} - async with session.get( - f"https://www.google.com/search?q={parse.quote(content)}", timeout=9.9 - ) as resp: - if resp.status == 200: - soup = BeautifulSoup(await resp.text(), 'lxml') - for p in soup.find_all('h3'): - if p.parent.has_attr('href'): - result[p.text] = p.parent.attrs.get('href') - logger.info(f"Google | Searching | {result[p.text]}") - if len(result) > 10: - break - return result - - resp.raise_for_status() diff --git a/tools/poster.py b/tools/poster.py new file mode 100644 index 0000000..3ef84ee --- /dev/null +++ b/tools/poster.py @@ -0,0 +1,110 @@ +import json +from time import time +from typing import Any, Dict +from urllib import parse + +from bs4 import BeautifulSoup +from loguru import logger + +from .sessions import session + + +async def google_search(content: str) -> Dict[str, str]: + result: Dict[str, str] = {} + async with session.get( + f"https://www.google.com/search?q={parse.quote(content)}", timeout=9.9 + ) as resp: + if resp.status == 200: + soup = BeautifulSoup(await resp.text(), 'lxml') + for p in soup.find_all('h3'): + if p.parent.has_attr('href'): + result[p.text] = p.parent.attrs.get('href') + logger.info(f"Google | Searching | {result[p.text]}") + if len(result) > 10: + break + return result + + resp.raise_for_status() + + +async def check_ip(ip: str) -> Dict[str, Any]: + # ------------- ip check -------------- + url = "https://www.vps234.com" + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36", + 'origin': url, + 'referer': f'{url}/ipchecker/', + 'x-requested-with': 'XMLHttpRequest' + } + + async with session.post( + f"{url}/ipcheck/getdata/", + data={ + 'idName': f'itemblockid{int(round(time() * 1000))}', + 'ip': ip, + }, + headers=headers + ) as resp: + if resp.status == 200: + return await resp.json() + + resp.raise_for_status() + + +async def check_ip_port(ip: str, port: str) -> Dict[str, str]: + # ------------- ip check -------------- + url = "https://www.toolsdaquan.com/toolapi/public/ipchecking" + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36", + 'referer': 'https://www.toolsdaquan.com/ipcheck/', + 'x-requested-with': 'XMLHttpRequest' + } + + async with session.post(f"{url}/{ip}/{port}", headers=headers) as resp: + if resp.status == 200: + # inner_data = await resp.json() + inner_data = json.loads(await resp.text()) + else: + resp.raise_for_status() + + async with session.post(f"{url}2/{ip}/{port}", headers=headers) as resp: + if resp.status == 200: + # outer_data = await resp.json() + outer_data = json.loads(await resp.text()) + inner_data.update(outer_data) + else: + resp.raise_for_status() + return inner_data + + +async def process_check_data(opt: int, resp: Dict[str, Any]) -> str: + print(resp) + + if opt == 1: + data = resp.get('data') + if resp.get('error') or not data.get('success'): + return f"⚠️ Api Connection failed. Message is `{resp.get('msg')}`" + _data = data.get('data') + in_icmp = "✅" if _data.get('innerICMP') else "❌" + in_tcp = "✅" if _data.get('innerTCP') else "❌" + out_icmp = "✅" if _data.get('outICMP') else "❌" + out_tcp = "✅" if _data.get('outTCP') else "❌" + return f"```Inner ICMP:{in_icmp}\n" \ + f"Inner TCP: {in_tcp}\n" \ + f"Outer ICMP:{out_icmp}\n" \ + f"Outer TCP: {out_tcp}```" + + elif opt == 2: + def is_opened(key): + return resp.get(key) == 'success' + + in_icmp = "✅" if is_opened('icmp') else "❌" + in_tcp = "✅" if is_opened('tcp') else "❌" + out_icmp = "✅" if is_opened('outside_icmp') else "❌" + out_tcp = "✅" if is_opened('outside_tcp') else "❌" + return f"```Inner ICMP:{in_icmp}\n" \ + f"Inner TCP: {in_tcp}\n" \ + f"Outer ICMP:{out_icmp}\n" \ + f"Outer TCP: {out_tcp}```"