PagerMaid_Plugins_Pyro/icp/main.py

92 lines
3.2 KiB
Python
Raw Normal View History

import time
import hashlib
from random import randint
from pagermaid.listener import listener
from pagermaid.enums import Message
from pagermaid.services import client as requests
from pagermaid.utils import pip_install
pip_install("tld")
from tld import get_fld
async def post_data(path, data, content, token):
url = 'https://hlwicpfwc.miit.gov.cn/icpproject_query/api/'
client_ip = f'{str(randint(1, 254))}.{str(randint(1, 254))}.{str(randint(1, 254))}.{str(randint(1, 254))}'
headers = {
'Content-Type': content,
'Origin': 'https://beian.miit.gov.cn/',
'Referer': 'https://beian.miit.gov.cn/',
'token': token,
'Client-IP': client_ip,
'X-Forwarded-For': client_ip
}
return await requests.post(url + path, data=data, headers=headers)
async def icp_search(domain):
md5 = hashlib.md5()
timestamp = int(time.time())
auth_key = f'testtest{timestamp}'
md5.update(auth_key.encode('utf-8'))
auth_key = md5.hexdigest()
token = await post_data(
'auth',
f'authKey={auth_key}&timeStamp={timestamp}',
'application/x-www-form-urlencoded;charset=UTF-8',
'0',
)
token = token.json()
if token.get("code", None) == 200:
token = token['params']['bussiness']
else:
return {'isBeian': False, 'msg': '获取token失败'}
query = await post_data('icpAbbreviateInfo/queryByCondition', '{"pageNum":"","pageSize":"","unitName":"%s"}' % (
domain), 'application/json;charset=UTF-8', token)
query = query.json()
if query.get("code", None) != 200:
return {'isBeian': False, 'msg': '查询失败'}
icp_list = query['params']['list']
if len(icp_list) <= 0:
return {'isBeian': False, 'msg': '成功'}
return {'isBeian': True, 'msg': '成功', 'data': icp_list[0]}
@listener(command="icp",
parameters="<域名>",
description="查询域名是否已备案")
async def icp_bei_an(message: Message):
url = None
if message.arguments:
url = message.arguments
elif message.reply_to_message:
url = message.reply_to_message.text
if not url:
return await message.edit("请输入或回复一个域名或链接。")
try:
url = get_fld(url, fix_protocol=True)
message: Message = await message.edit(f"查询中...{url}")
except Exception:
return await message.edit("出错了呜呜呜 ~ 无效的参数。")
try:
data = await icp_search(url)
except Exception as e:
return await message.edit(f"出错了呜呜呜 ~ 查询失败。{e}")
if data.get("isBeian", False):
data = data.get("data", {})
await message.edit(
f"域名: {url}\n"
f"主体: {data.get('unitName', '')}\n"
f"备案时间: {data.get('updateRecordTime', '')}\n"
f"备案类型: {data.get('natureName', '')}\n"
f"备案号: {data.get('serviceLicence', '')}\n"
f"是否限制访问: {data.get('limitAccess', '')}")
elif data.get("msg", "") == "成功":
await message.edit(f"域名 {url} 未备案!")
else:
await message.edit("出错了呜呜呜 ~ " + data.get("msg", "") + "")