PagerMaid_Plugins_Pyro/icp/main.py
2023-07-01 20:18:58 +08:00

95 lines
3.2 KiB
Python

import time
import hashlib
from random import randint
from pagermaid.listener import listener
from pagermaid.enums import Message
from pagermaid.services import client as requests
from pagermaid.utils import pip_install
pip_install("tld")
from tld import get_fld
async def post_data(path, data, content, token):
url = "https://hlwicpfwc.miit.gov.cn/icpproject_query/api/"
client_ip = f"{str(randint(1, 254))}.{str(randint(1, 254))}.{str(randint(1, 254))}.{str(randint(1, 254))}"
headers = {
"Content-Type": content,
"Origin": "https://beian.miit.gov.cn/",
"Referer": "https://beian.miit.gov.cn/",
"token": token,
"Client-IP": client_ip,
"X-Forwarded-For": client_ip,
}
return await requests.post(url + path, data=data, headers=headers)
async def icp_search(domain):
md5 = hashlib.md5()
timestamp = int(time.time())
auth_key = f"testtest{timestamp}"
md5.update(auth_key.encode("utf-8"))
auth_key = md5.hexdigest()
token = await post_data(
"auth",
f"authKey={auth_key}&timeStamp={timestamp}",
"application/x-www-form-urlencoded;charset=UTF-8",
"0",
)
token = token.json()
if token.get("code", None) == 200:
token = token["params"]["bussiness"]
else:
return {"isBeian": False, "msg": "获取token失败"}
query = await post_data(
"icpAbbreviateInfo/queryByCondition",
'{"pageNum":"","pageSize":"","unitName":"%s"}' % (domain),
"application/json;charset=UTF-8",
token,
)
query = query.json()
if query.get("code", None) != 200:
return {"isBeian": False, "msg": "查询失败"}
icp_list = query["params"]["list"]
if len(icp_list) <= 0:
return {"isBeian": False, "msg": "成功"}
return {"isBeian": True, "msg": "成功", "data": icp_list[0]}
@listener(command="icp", parameters="[域名]", description="查询域名是否已备案")
async def icp_bei_an(message: Message):
url = None
if message.arguments:
url = message.arguments
elif message.reply_to_message:
url = message.reply_to_message.text
if not url:
return await message.edit("请输入或回复一个域名或链接。")
try:
url = get_fld(url, fix_protocol=True)
message: Message = await message.edit(f"查询中...{url}")
except Exception:
return await message.edit("出错了呜呜呜 ~ 无效的参数。")
try:
data = await icp_search(url)
except Exception as e:
return await message.edit(f"出错了呜呜呜 ~ 查询失败。{e}")
if data.get("isBeian", False):
data = data.get("data", {})
await message.edit(
f"域名: {url}\n"
f"主体: {data.get('unitName', '')}\n"
f"备案时间: {data.get('updateRecordTime', '')}\n"
f"备案类型: {data.get('natureName', '')}\n"
f"备案号: {data.get('serviceLicence', '')}\n"
f"是否限制访问: {data.get('limitAccess', '')}"
)
elif data.get("msg", "") == "成功":
await message.edit(f"域名 {url} 未备案!")
else:
await message.edit("出错了呜呜呜 ~ " + data.get("msg", "") + "")