mirror of
https://github.com/TeamPGM/PagerMaid_Plugins_Pyro.git
synced 2024-11-23 00:12:31 +00:00
92 lines
3.2 KiB
Python
92 lines
3.2 KiB
Python
|
import time
|
||
|
import hashlib
|
||
|
from random import randint
|
||
|
|
||
|
from pagermaid.listener import listener
|
||
|
from pagermaid.enums import Message
|
||
|
from pagermaid.services import client as requests
|
||
|
from pagermaid.utils import pip_install
|
||
|
|
||
|
pip_install("tld")
|
||
|
|
||
|
from tld import get_fld
|
||
|
|
||
|
|
||
|
async def post_data(path, data, content, token):
|
||
|
url = 'https://hlwicpfwc.miit.gov.cn/icpproject_query/api/'
|
||
|
client_ip = f'{str(randint(1, 254))}.{str(randint(1, 254))}.{str(randint(1, 254))}.{str(randint(1, 254))}'
|
||
|
headers = {
|
||
|
'Content-Type': content,
|
||
|
'Origin': 'https://beian.miit.gov.cn/',
|
||
|
'Referer': 'https://beian.miit.gov.cn/',
|
||
|
'token': token,
|
||
|
'Client-IP': client_ip,
|
||
|
'X-Forwarded-For': client_ip
|
||
|
}
|
||
|
return await requests.post(url + path, data=data, headers=headers)
|
||
|
|
||
|
|
||
|
async def icp_search(domain):
|
||
|
md5 = hashlib.md5()
|
||
|
timestamp = int(time.time())
|
||
|
auth_key = f'testtest{timestamp}'
|
||
|
md5.update(auth_key.encode('utf-8'))
|
||
|
auth_key = md5.hexdigest()
|
||
|
token = await post_data(
|
||
|
'auth',
|
||
|
f'authKey={auth_key}&timeStamp={timestamp}',
|
||
|
'application/x-www-form-urlencoded;charset=UTF-8',
|
||
|
'0',
|
||
|
)
|
||
|
token = token.json()
|
||
|
if token.get("code", None) == 200:
|
||
|
token = token['params']['bussiness']
|
||
|
else:
|
||
|
return {'isBeian': False, 'msg': '获取token失败'}
|
||
|
|
||
|
query = await post_data('icpAbbreviateInfo/queryByCondition', '{"pageNum":"","pageSize":"","unitName":"%s"}' % (
|
||
|
domain), 'application/json;charset=UTF-8', token)
|
||
|
query = query.json()
|
||
|
if query.get("code", None) != 200:
|
||
|
return {'isBeian': False, 'msg': '查询失败'}
|
||
|
icp_list = query['params']['list']
|
||
|
if len(icp_list) <= 0:
|
||
|
return {'isBeian': False, 'msg': '成功'}
|
||
|
return {'isBeian': True, 'msg': '成功', 'data': icp_list[0]}
|
||
|
|
||
|
|
||
|
@listener(command="icp",
|
||
|
parameters="<域名>",
|
||
|
description="查询域名是否已备案")
|
||
|
async def icp_bei_an(message: Message):
|
||
|
url = None
|
||
|
if message.arguments:
|
||
|
url = message.arguments
|
||
|
elif message.reply_to_message:
|
||
|
url = message.reply_to_message.text
|
||
|
if not url:
|
||
|
return await message.edit("请输入或回复一个域名或链接。")
|
||
|
try:
|
||
|
url = get_fld(url, fix_protocol=True)
|
||
|
message: Message = await message.edit(f"查询中...{url}")
|
||
|
except Exception:
|
||
|
return await message.edit("出错了呜呜呜 ~ 无效的参数。")
|
||
|
try:
|
||
|
data = await icp_search(url)
|
||
|
except Exception as e:
|
||
|
return await message.edit(f"出错了呜呜呜 ~ 查询失败。{e}")
|
||
|
|
||
|
if data.get("isBeian", False):
|
||
|
data = data.get("data", {})
|
||
|
await message.edit(
|
||
|
f"域名: {url}\n"
|
||
|
f"主体: {data.get('unitName', '')}\n"
|
||
|
f"备案时间: {data.get('updateRecordTime', '')}\n"
|
||
|
f"备案类型: {data.get('natureName', '')}\n"
|
||
|
f"备案号: {data.get('serviceLicence', '')}\n"
|
||
|
f"是否限制访问: {data.get('limitAccess', '')}")
|
||
|
elif data.get("msg", "") == "成功":
|
||
|
await message.edit(f"域名 {url} 未备案!")
|
||
|
else:
|
||
|
await message.edit("出错了呜呜呜 ~ " + data.get("msg", "") + "。")
|