mirror of
https://github.com/Xtao-Labs/iShotaBot.git
synced 2024-11-16 04:35:55 +00:00
✨ Add ip
This commit is contained in:
parent
78627343f8
commit
e98b23a695
23
defs/ip.py
Normal file
23
defs/ip.py
Normal file
@ -0,0 +1,23 @@
|
||||
import contextlib
|
||||
|
||||
|
||||
def ip_info(url, ipinfo_json):
|
||||
ipinfo_list = [f"查询目标: `{url}`"]
|
||||
if ipinfo_json['query'] != url:
|
||||
ipinfo_list.extend(["解析地址: `" + ipinfo_json['query'] + "`"])
|
||||
ipinfo_list.extend(["地区: `" + ipinfo_json['country'] + ' - ' + ipinfo_json['regionName'] + ' - ' +
|
||||
ipinfo_json['city'] + "`"])
|
||||
ipinfo_list.extend(["经纬度: `" + str(ipinfo_json['lat']) + ',' + str(ipinfo_json['lon']) + "`"])
|
||||
ipinfo_list.extend(["ISP: `" + ipinfo_json['isp'] + "`"])
|
||||
if ipinfo_json['org'] != '':
|
||||
ipinfo_list.extend(["组织: `" + ipinfo_json['org'] + "`"])
|
||||
with contextlib.suppress(Exception):
|
||||
ipinfo_list.extend(
|
||||
['[' + ipinfo_json['as'] + '](https://bgp.he.net/' + ipinfo_json['as'].split()[0] + ')'])
|
||||
if ipinfo_json['mobile']:
|
||||
ipinfo_list.extend(['此 IP 可能为**蜂窝移动数据 IP**'])
|
||||
if ipinfo_json['proxy']:
|
||||
ipinfo_list.extend(['此 IP 可能为**代理 IP**'])
|
||||
if ipinfo_json['hosting']:
|
||||
ipinfo_list.extend(['此 IP 可能为**数据中心 IP**'])
|
||||
return '\n'.join(ipinfo_list)
|
72
modules/ip.py
Normal file
72
modules/ip.py
Normal file
@ -0,0 +1,72 @@
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from httpx import get
|
||||
|
||||
from pyrogram import Client, filters
|
||||
from pyrogram.types import Message
|
||||
|
||||
from defs.ip import ip_info
|
||||
from init import user_me
|
||||
|
||||
|
||||
@Client.on_message(filters.incoming & filters.group &
|
||||
filters.command(["ip", f"ip@{user_me.username}"]))
|
||||
async def ip_command(_: Client, message: Message):
|
||||
msg = await message.reply('正在查询中...')
|
||||
rep_text = ''
|
||||
reply = message.reply_to_message
|
||||
if reply:
|
||||
if reply.entities:
|
||||
for num in range(0, len(reply.entities)):
|
||||
url = reply.text[
|
||||
reply.entities[num].offset:reply.entities[num].offset + reply.entities[num].length]
|
||||
url = urlparse(url)
|
||||
if url.hostname or url.path:
|
||||
if url.hostname:
|
||||
url = url.hostname
|
||||
else:
|
||||
url = url.path
|
||||
ipinfo_json = get(
|
||||
"http://ip-api.com/json/" + url + "?fields=status,message,country,regionName,city,"
|
||||
"lat,lon,isp,"
|
||||
"org,as,mobile,proxy,hosting,query").json()
|
||||
if ipinfo_json['status'] == 'fail':
|
||||
pass
|
||||
elif ipinfo_json['status'] == 'success':
|
||||
rep_text = ip_info(url, ipinfo_json)
|
||||
text = ''
|
||||
if message.entities:
|
||||
for num in range(0, len(message.entities)):
|
||||
url = message.text[
|
||||
message.entities[num].offset:message.entities[num].offset + message.entities[num].length]
|
||||
url = urlparse(url)
|
||||
if url.hostname or url.path:
|
||||
if url.hostname:
|
||||
url = url.hostname
|
||||
else:
|
||||
url = url.path
|
||||
ipinfo_json = get(
|
||||
"http://ip-api.com/json/" + url + "?fields=status,message,country,regionName,city,lat,"
|
||||
"lon,isp,"
|
||||
"org,as,mobile,proxy,hosting,query").json()
|
||||
if ipinfo_json['status'] == 'fail':
|
||||
pass
|
||||
elif ipinfo_json['status'] == 'success':
|
||||
text = ip_info(url, ipinfo_json)
|
||||
if text == '':
|
||||
url = message.text[4:]
|
||||
if not url == '':
|
||||
ipinfo_json = get(
|
||||
"http://ip-api.com/json/" + url + "?fields=status,message,country,regionName,city,lat,"
|
||||
"lon,isp,"
|
||||
"org,as,mobile,proxy,hosting,query").json()
|
||||
if ipinfo_json['status'] == 'fail':
|
||||
pass
|
||||
elif ipinfo_json['status'] == 'success':
|
||||
text = ip_info(url, ipinfo_json)
|
||||
if rep_text == '' and text == '':
|
||||
await msg.edit('没有找到要查询的 ip/域名 ...')
|
||||
elif not rep_text == '' and not text == '':
|
||||
await msg.edit(f'{rep_text}\n================\n{text}')
|
||||
else:
|
||||
await msg.edit(f'{rep_text}{text}')
|
Loading…
Reference in New Issue
Block a user