import time import hashlib from random import randint from pagermaid.listener import listener from pagermaid.enums import Message from pagermaid.services import client as requests from pagermaid.utils import pip_install pip_install("tld") from tld import get_fld async def post_data(path, data, content, token): url = 'https://hlwicpfwc.miit.gov.cn/icpproject_query/api/' client_ip = f'{str(randint(1, 254))}.{str(randint(1, 254))}.{str(randint(1, 254))}.{str(randint(1, 254))}' headers = { 'Content-Type': content, 'Origin': 'https://beian.miit.gov.cn/', 'Referer': 'https://beian.miit.gov.cn/', 'token': token, 'Client-IP': client_ip, 'X-Forwarded-For': client_ip } return await requests.post(url + path, data=data, headers=headers) async def icp_search(domain): md5 = hashlib.md5() timestamp = int(time.time()) auth_key = f'testtest{timestamp}' md5.update(auth_key.encode('utf-8')) auth_key = md5.hexdigest() token = await post_data( 'auth', f'authKey={auth_key}&timeStamp={timestamp}', 'application/x-www-form-urlencoded;charset=UTF-8', '0', ) token = token.json() if token.get("code", None) == 200: token = token['params']['bussiness'] else: return {'isBeian': False, 'msg': '获取token失败'} query = await post_data('icpAbbreviateInfo/queryByCondition', '{"pageNum":"","pageSize":"","unitName":"%s"}' % ( domain), 'application/json;charset=UTF-8', token) query = query.json() if query.get("code", None) != 200: return {'isBeian': False, 'msg': '查询失败'} icp_list = query['params']['list'] if len(icp_list) <= 0: return {'isBeian': False, 'msg': '成功'} return {'isBeian': True, 'msg': '成功', 'data': icp_list[0]} @listener(command="icp", parameters="<域名>", description="查询域名是否已备案") async def icp_bei_an(message: Message): url = None if message.arguments: url = message.arguments elif message.reply_to_message: url = message.reply_to_message.text if not url: return await message.edit("请输入或回复一个域名或链接。") try: url = get_fld(url, fix_protocol=True) message: Message = await message.edit(f"查询中...{url}") except Exception: return await message.edit("出错了呜呜呜 ~ 无效的参数。") try: data = await icp_search(url) except Exception as e: return await message.edit(f"出错了呜呜呜 ~ 查询失败。{e}") if data.get("isBeian", False): data = data.get("data", {}) await message.edit( f"域名: {url}\n" f"主体: {data.get('unitName', '')}\n" f"备案时间: {data.get('updateRecordTime', '')}\n" f"备案类型: {data.get('natureName', '')}\n" f"备案号: {data.get('serviceLicence', '')}\n" f"是否限制访问: {data.get('limitAccess', '')}") elif data.get("msg", "") == "成功": await message.edit(f"域名 {url} 未备案!") else: await message.edit("出错了呜呜呜 ~ " + data.get("msg", "") + "。")