302 lines
10 KiB
Python
302 lines
10 KiB
Python
""" PagerMaid module that contains utilities related to system status. """
|
|
|
|
from json import loads
|
|
from PIL import Image
|
|
from requests import get
|
|
from os import remove, popen
|
|
from datetime import datetime
|
|
from speedtest import distance, Speedtest, ShareResultsConnectFailure, ShareResultsSubmitFailure, NoMatchedServers, \
|
|
SpeedtestBestServerFailure
|
|
from telethon import functions
|
|
from platform import python_version, uname
|
|
from wordcloud import WordCloud
|
|
from telethon import version as telethon_version
|
|
from sys import platform
|
|
from re import sub, findall
|
|
from pathlib import Path
|
|
from pagermaid import log, config, redis_status
|
|
from pagermaid.utils import execute, upload_attachment
|
|
from pagermaid.listener import listener
|
|
from pagermaid.utils import lang, alias_command
|
|
|
|
DCs = {
|
|
1: "149.154.175.50",
|
|
2: "149.154.167.51",
|
|
3: "149.154.175.100",
|
|
4: "149.154.167.91",
|
|
5: "91.108.56.130"
|
|
}
|
|
|
|
|
|
@listener(is_plugin=False, outgoing=True, command=alias_command("sysinfo"),
|
|
description=lang('sysinfo_des'))
|
|
async def sysinfo(context):
|
|
""" Retrieve system information via neofetch. """
|
|
await context.edit(lang('sysinfo_loading'))
|
|
result = await execute("neofetch --config none --stdout")
|
|
await context.edit(f"`{result}`")
|
|
|
|
|
|
@listener(is_plugin=False, outgoing=True, command=alias_command("fortune"),
|
|
description=lang('fortune_des'))
|
|
async def fortune(context):
|
|
""" Reads a fortune cookie. """
|
|
result = await execute("fortune")
|
|
if result == "/bin/sh: fortune: command not found":
|
|
await context.edit(lang('fortune_not_exist'))
|
|
return
|
|
await context.edit(result)
|
|
|
|
|
|
@listener(is_plugin=False, outgoing=True, command=alias_command("fbcon"),
|
|
description=lang('fbcon_des'))
|
|
async def tty(context):
|
|
""" Screenshots a TTY and prints it. """
|
|
await context.edit(lang('fbcon_processing'))
|
|
reply_id = context.message.reply_to_msg_id
|
|
result = await execute("fbdump | convert - image.png")
|
|
if result == "/bin/sh: fbdump: command not found":
|
|
await context.edit(lang('fbcon_no_fbdump'))
|
|
remove("image.png")
|
|
return
|
|
if result == "/bin/sh: convert: command not found":
|
|
await context.edit(lang('fbcon_no_ImageMagick'))
|
|
remove("image.png")
|
|
return
|
|
if result == "Failed to open /dev/fb0: Permission denied":
|
|
await context.edit(lang('fbcon_no_permission'))
|
|
return
|
|
if not await upload_attachment("./image.png", context.chat_id, reply_id,
|
|
caption=lang('fbcon_caption'),
|
|
preview=False, document=False):
|
|
await context.edit(lang('fbcon_error'))
|
|
return
|
|
await context.delete()
|
|
try:
|
|
remove("./image.png")
|
|
except:
|
|
pass
|
|
await log("Screenshot of binded framebuffer console taken.")
|
|
|
|
|
|
@listener(is_plugin=False, outgoing=True, command=alias_command("status"),
|
|
description=lang('status_des'))
|
|
async def status(context):
|
|
database = lang('status_online') if redis_status() else lang('status_offline')
|
|
text = (f"**{lang('status_hint')}** \n"
|
|
f"{lang('status_name')}: `{uname().node}` \n"
|
|
f"{lang('status_platform')}: `{platform}` \n"
|
|
f"{lang('status_release')}: `{uname().release}` \n"
|
|
f"{lang('status_python')}: `{python_version()}` \n"
|
|
f"{lang('status_telethon')}: `{telethon_version.__version__}` \n"
|
|
f"{lang('status_db')}: `{database}`"
|
|
)
|
|
await context.edit(text)
|
|
dialogs = await context.client.get_dialogs()
|
|
dialogs = len(dialogs)
|
|
text += f"\n{lang('status_dialogs')}: `{dialogs}`"
|
|
await context.edit(text)
|
|
|
|
|
|
@listener(is_plugin=False, outgoing=True, command=alias_command("speedtest"),
|
|
description=lang('speedtest_des'))
|
|
async def speedtest(context):
|
|
""" Tests internet speed using speedtest. """
|
|
test = Speedtest()
|
|
server, server_json = [], False
|
|
if len(context.parameter) == 1:
|
|
try:
|
|
server = [int(context.parameter[0])]
|
|
except ValueError:
|
|
await context.edit(lang('arg_error'))
|
|
return
|
|
elif len(context.parameter) >= 2:
|
|
try:
|
|
temp_json = findall(r'{(.*?)}', context.text.replace("'", '"'))
|
|
if len(temp_json) == 1:
|
|
server_json = loads("{" + temp_json[0] + "}")
|
|
server_json['d'] = distance(test.lat_lon, (float(server_json['lat']), float(server_json['lon'])))
|
|
test.servers = [server_json]
|
|
else:
|
|
await context.edit(lang('arg_error'))
|
|
return
|
|
except:
|
|
pass
|
|
await context.edit(lang('speedtest_processing'))
|
|
try:
|
|
if len(server) == 0:
|
|
if not server_json:
|
|
test.get_best_server()
|
|
else:
|
|
test.get_best_server(servers=test.servers)
|
|
else:
|
|
test.get_servers(servers=server)
|
|
except (SpeedtestBestServerFailure, NoMatchedServers) as e:
|
|
await context.edit(lang('speedtest_ServerFailure'))
|
|
return
|
|
test.download()
|
|
test.upload()
|
|
try:
|
|
test.results.share()
|
|
except (ShareResultsConnectFailure, ShareResultsSubmitFailure) as e:
|
|
await context.edit(lang('speedtest_ConnectFailure'))
|
|
return
|
|
result = test.results.dict()
|
|
des = (
|
|
f"**Speedtest** \n"
|
|
f"Server: `{result['server']['name']} - "
|
|
f"{result['server']['cc']}` \n"
|
|
f"Sponsor: `{result['server']['sponsor']}` \n"
|
|
f"Upload: `{unit_convert(result['upload'])}` \n"
|
|
f"Download: `{unit_convert(result['download'])}` \n"
|
|
f"Latency: `{result['ping']}` \n"
|
|
f"Timestamp: `{result['timestamp']}`"
|
|
)
|
|
# 开始处理图片
|
|
data = get(result['share']).content
|
|
with open('speedtest.png', mode='wb') as f:
|
|
f.write(data)
|
|
try:
|
|
img = Image.open('speedtest.png')
|
|
c = img.crop((17, 11, 727, 389))
|
|
c.save('speedtest.png')
|
|
except:
|
|
pass
|
|
try:
|
|
await context.client.send_file(context.chat_id, 'speedtest.png', caption=des)
|
|
except:
|
|
return
|
|
try:
|
|
remove('speedtest.png')
|
|
except:
|
|
pass
|
|
await context.delete()
|
|
|
|
|
|
@listener(is_plugin=False, outgoing=True, command=alias_command("connection"),
|
|
description=lang('connection_des'))
|
|
async def connection(context):
|
|
""" Displays connection information between PagerMaid and Telegram. """
|
|
datacenter = await context.client(functions.help.GetNearestDcRequest())
|
|
await context.edit(
|
|
f"**{lang('connection_hint')}** \n"
|
|
f"{lang('connection_country')}: `{datacenter.country}` \n"
|
|
f"{lang('connection_dc')}: `{datacenter.this_dc}` \n"
|
|
f"{lang('connection_nearest_dc')}: `{datacenter.nearest_dc}`"
|
|
)
|
|
|
|
|
|
@listener(is_plugin=False, outgoing=True, command=alias_command("pingdc"),
|
|
description=lang('pingdc_des'))
|
|
async def pingdc(context):
|
|
""" Ping your or other data center's IP addresses. """
|
|
data = []
|
|
for dc in range(1, 6):
|
|
result = await execute(f"ping -c 1 {DCs[dc]} | awk -F '/' " + "'END {print $5}'")
|
|
data.append(result)
|
|
await context.edit(
|
|
f"{lang('pingdc_1')}: `{data[0]}ms`\n"
|
|
f"{lang('pingdc_2')}: `{data[1]}ms`\n"
|
|
f"{lang('pingdc_3')}: `{data[2]}ms`\n"
|
|
f"{lang('pingdc_4')}: `{data[3]}ms`\n"
|
|
f"{lang('pingdc_5')}: `{data[4]}ms`"
|
|
)
|
|
|
|
|
|
@listener(is_plugin=False, outgoing=True, command=alias_command("ping"),
|
|
description=lang('ping_des'))
|
|
async def ping(context):
|
|
""" Calculates latency between PagerMaid and Telegram. """
|
|
start = datetime.now()
|
|
await context.edit("Pong!")
|
|
end = datetime.now()
|
|
duration = (end - start).microseconds / 1000
|
|
await context.edit(f"Pong!|{duration}")
|
|
|
|
|
|
@listener(is_plugin=False, outgoing=True, command=alias_command("topcloud"),
|
|
description=lang('topcloud_des'))
|
|
async def topcloud(context):
|
|
""" Generates a word cloud of resource-hungry processes. """
|
|
await context.edit(lang('topcloud_processing'))
|
|
command_list = []
|
|
if not Path('/usr/bin/top').is_symlink():
|
|
output = str(await execute("top -b -n 1")).split("\n")[7:]
|
|
else:
|
|
output = str(await execute("top -b -n 1")).split("\n")[4:]
|
|
for line in output[:-1]:
|
|
line = sub(r'\s+', ' ', line).strip()
|
|
fields = line.split(" ")
|
|
try:
|
|
if fields[11].count("/") > 0:
|
|
command = fields[11].split("/")[0]
|
|
else:
|
|
command = fields[11]
|
|
|
|
cpu = float(fields[8].replace(",", "."))
|
|
mem = float(fields[9].replace(",", "."))
|
|
|
|
if command != "top":
|
|
command_list.append((command, cpu, mem))
|
|
except BaseException:
|
|
pass
|
|
command_dict = {}
|
|
for command, cpu, mem in command_list:
|
|
if command in command_dict:
|
|
command_dict[command][0] += cpu
|
|
command_dict[command][1] += mem
|
|
else:
|
|
command_dict[command] = [cpu + 1, mem + 1]
|
|
|
|
resource_dict = {}
|
|
|
|
for command, [cpu, mem] in command_dict.items():
|
|
resource_dict[command] = (cpu ** 2 + mem ** 2) ** 0.5
|
|
|
|
width, height = None, None
|
|
try:
|
|
width, height = ((popen("xrandr | grep '*'").read()).split()[0]).split("x")
|
|
width = int(width)
|
|
height = int(height)
|
|
except BaseException:
|
|
pass
|
|
if not width or not height:
|
|
width = int(config['width'])
|
|
height = int(config['height'])
|
|
background = config['background']
|
|
margin = int(config['margin'])
|
|
|
|
cloud = WordCloud(
|
|
background_color=background,
|
|
width=width - 2 * int(margin),
|
|
height=height - 2 * int(margin)
|
|
).generate_from_frequencies(resource_dict)
|
|
|
|
cloud.to_file("cloud.png")
|
|
await context.edit(lang('highlight_uploading'))
|
|
await context.client.send_file(
|
|
context.chat_id,
|
|
"cloud.png",
|
|
reply_to=None,
|
|
caption=lang('topcloud_caption')
|
|
)
|
|
remove("cloud.png")
|
|
await context.delete()
|
|
await log(lang('topcloud_success'))
|
|
|
|
|
|
def unit_convert(byte):
|
|
""" Converts byte into readable formats. """
|
|
power = 1000
|
|
zero = 0
|
|
units = {
|
|
0: '',
|
|
1: 'Kb/s',
|
|
2: 'Mb/s',
|
|
3: 'Gb/s',
|
|
4: 'Tb/s'}
|
|
while byte > power:
|
|
byte /= power
|
|
zero += 1
|
|
return f"{round(byte, 2)} {units[zero]}"
|