Add ipcheck plugin

This commit is contained in:
iwumingz 2022-04-11 14:08:50 +08:00
parent 2dd1a57d2c
commit 7762986d63
5 changed files with 145 additions and 26 deletions

View File

@ -161,3 +161,8 @@ ip:
cmd: ip
format: -ip <IP地址|域名|me>
usage: 查询IP地址或域名的信息
ipcheck:
cmd: ipcheck
format: -ipcheck <IP|域名> <端口|无>
usage: 无端口参数时查询IP或域名是否被阻断有则查询端口是否开启

View File

@ -6,7 +6,7 @@ from loguru import logger
from pyrogram import Client
from pyrogram.errors import FloodWait
from pyrogram.types import Message
from tools.googles import google_search
from tools.poster import google_search
from tools.helpers import Parameters, show_cmd_tip, show_exception

View File

@ -1,7 +1,9 @@
from loguru import logger
from core import command
from pyrogram import Client
from pyrogram.types import Message
from tools.helpers import Parameters, show_cmd_tip, show_exception
from tools.poster import check_ip, check_ip_port, process_check_data
from tools.sessions import session
@ -29,3 +31,30 @@ async def ip(_: Client, msg: Message):
return await show_exception(msg, e)
else:
await msg.edit_text(text)
@Client.on_message(command("ipcheck"))
async def ip_checker(_: Client, msg: Message):
"""检测IP或者域名是否被阻断"""
cmd, args = Parameters.get_more(msg)
if len(args) == 1:
try:
resp = await check_ip(args[0])
except Exception as e:
logger.error(e)
return await show_exception(msg, e)
elif len(args) == 2:
try:
resp = await check_ip_port(args[0], args[1])
except Exception as e:
logger.error(e)
return await show_exception(msg, e)
else:
return await show_cmd_tip(msg, cmd)
try:
res = await process_check_data(len(args), resp=resp)
await msg.edit_text(f"🔎 Query `{' '.join(args)}`\n{res}")
except Exception as e:
logger.error(e)
await show_exception(msg, e)

View File

@ -1,25 +0,0 @@
from typing import Dict
from urllib import parse
from bs4 import BeautifulSoup
from loguru import logger
from .sessions import session
async def google_search(content: str) -> Dict[str, str]:
result: Dict[str, str] = {}
async with session.get(
f"https://www.google.com/search?q={parse.quote(content)}", timeout=9.9
) as resp:
if resp.status == 200:
soup = BeautifulSoup(await resp.text(), 'lxml')
for p in soup.find_all('h3'):
if p.parent.has_attr('href'):
result[p.text] = p.parent.attrs.get('href')
logger.info(f"Google | Searching | {result[p.text]}")
if len(result) > 10:
break
return result
resp.raise_for_status()

110
tools/poster.py Normal file
View File

@ -0,0 +1,110 @@
import json
from time import time
from typing import Any, Dict
from urllib import parse
from bs4 import BeautifulSoup
from loguru import logger
from .sessions import session
async def google_search(content: str) -> Dict[str, str]:
result: Dict[str, str] = {}
async with session.get(
f"https://www.google.com/search?q={parse.quote(content)}", timeout=9.9
) as resp:
if resp.status == 200:
soup = BeautifulSoup(await resp.text(), 'lxml')
for p in soup.find_all('h3'):
if p.parent.has_attr('href'):
result[p.text] = p.parent.attrs.get('href')
logger.info(f"Google | Searching | {result[p.text]}")
if len(result) > 10:
break
return result
resp.raise_for_status()
async def check_ip(ip: str) -> Dict[str, Any]:
# ------------- ip check --------------
url = "https://www.vps234.com"
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36",
'origin': url,
'referer': f'{url}/ipchecker/',
'x-requested-with': 'XMLHttpRequest'
}
async with session.post(
f"{url}/ipcheck/getdata/",
data={
'idName': f'itemblockid{int(round(time() * 1000))}',
'ip': ip,
},
headers=headers
) as resp:
if resp.status == 200:
return await resp.json()
resp.raise_for_status()
async def check_ip_port(ip: str, port: str) -> Dict[str, str]:
# ------------- ip check --------------
url = "https://www.toolsdaquan.com/toolapi/public/ipchecking"
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36",
'referer': 'https://www.toolsdaquan.com/ipcheck/',
'x-requested-with': 'XMLHttpRequest'
}
async with session.post(f"{url}/{ip}/{port}", headers=headers) as resp:
if resp.status == 200:
# inner_data = await resp.json()
inner_data = json.loads(await resp.text())
else:
resp.raise_for_status()
async with session.post(f"{url}2/{ip}/{port}", headers=headers) as resp:
if resp.status == 200:
# outer_data = await resp.json()
outer_data = json.loads(await resp.text())
inner_data.update(outer_data)
else:
resp.raise_for_status()
return inner_data
async def process_check_data(opt: int, resp: Dict[str, Any]) -> str:
print(resp)
if opt == 1:
data = resp.get('data')
if resp.get('error') or not data.get('success'):
return f"⚠️ Api Connection failed. Message is `{resp.get('msg')}`"
_data = data.get('data')
in_icmp = "" if _data.get('innerICMP') else ""
in_tcp = "" if _data.get('innerTCP') else ""
out_icmp = "" if _data.get('outICMP') else ""
out_tcp = "" if _data.get('outTCP') else ""
return f"```Inner ICMP{in_icmp}\n" \
f"Inner TCP {in_tcp}\n" \
f"Outer ICMP{out_icmp}\n" \
f"Outer TCP {out_tcp}```"
elif opt == 2:
def is_opened(key):
return resp.get(key) == 'success'
in_icmp = "" if is_opened('icmp') else ""
in_tcp = "" if is_opened('tcp') else ""
out_icmp = "" if is_opened('outside_icmp') else ""
out_tcp = "" if is_opened('outside_tcp') else ""
return f"```Inner ICMP{in_icmp}\n" \
f"Inner TCP {in_tcp}\n" \
f"Outer ICMP{out_icmp}\n" \
f"Outer TCP {out_tcp}```"