PagerMaid-Modify/pagermaid/modules/status.py

211 lines
7.3 KiB
Python
Raw Normal View History

""" PagerMaid module that contains utilities related to system status. """
from os import remove, popen
from datetime import datetime
from speedtest import Speedtest
from telethon import functions
from platform import python_version, uname
from wordcloud import WordCloud
from telethon import version as telethon_version
from sys import platform
from re import sub
from pathlib import Path
from pagermaid import log, config, redis_status
from pagermaid.utils import execute, upload_attachment
from pagermaid.listener import listener
@listener(outgoing=True, command="sysinfo",
description="通过 neofetch 检索系统信息。")
async def sysinfo(context):
""" Retrieve system information via neofetch. """
await context.edit("加载系统信息中 . . .")
result = await execute("neofetch --config none --stdout")
await context.edit(f"`{result}`")
@listener(outgoing=True, command="fortune",
description="读取 fortune cookies 信息。")
async def fortune(context):
""" Reads a fortune cookie. """
result = await execute("fortune")
if result == "/bin/sh: fortune: command not found":
await context.edit("`出错了呜呜呜 ~ 此系统上没有 fortune cookies`")
return
await context.edit(result)
@listener(outgoing=True, command="fbcon",
description="拍摄当前绑定的帧缓冲控制台的屏幕截图。")
async def tty(context):
""" Screenshots a TTY and prints it. """
await context.edit("拍摄帧缓冲控制台的屏幕截图中 . . .")
reply_id = context.message.reply_to_msg_id
result = await execute("fbdump | magick - image.png")
if result == "/bin/sh: fbdump: command not found":
await context.edit("出错了呜呜呜 ~ 此系统上没有 fbdump")
remove("image.png")
return
if result == "/bin/sh: convert: command not found":
await context.edit("出错了呜呜呜 ~ ImageMagick 在该系统上不存在。")
remove("image.png")
return
if result == "Failed to open /dev/fb0: Permission denied":
await context.edit("出错了呜呜呜 ~ 运行 PagerMaid 的用户不在视频组中。")
remove("image.png")
return
if not await upload_attachment("image.png", context.chat_id, reply_id,
caption="绑定的帧缓冲区的屏幕截图。",
preview=False, document=False):
await context.edit("出错了呜呜呜 ~ 由于发生意外错误,导致文件生成失败。")
return
await context.delete()
remove("image.png")
await log("Screenshot of binded framebuffer console taken.")
@listener(outgoing=True, command="status",
description="输出 PagerMaid 的状态。")
async def status(context):
database = "Connected" if redis_status() else "Disconnected"
await context.edit(
f"**PagerMaid 状态** \n"
f"主机名: `{uname().node}` \n"
f"主机平台: `{platform}` \n"
f"Kernel 版本: `{uname().release}` \n"
f"Python 版本: `{python_version()}` \n"
f"Library 版本: `{telethon_version.__version__}` \n"
f"数据库状态: `{'Connected' if redis_status() else 'Disconnected'}`"
)
@listener(outgoing=True, command="speedtest",
description="执行 speedtest 脚本并输出您的互联网速度。")
async def speedtest(context):
""" Tests internet speed using speedtest. """
await context.edit("执行测试脚本 . . .")
test = Speedtest()
test.get_best_server()
test.download()
test.upload()
test.results.share()
result = test.results.dict()
await context.edit(
f"**Speedtest** \n"
f"Upload: `{unit_convert(result['upload'])}` \n"
f"Download: `{unit_convert(result['download'])}` \n"
f"Latency: `{result['ping']}` \n"
f"Timestamp: `{result['timestamp']}`"
)
@listener(outgoing=True, command="connection",
description="显示 PagerMaid 和 Telegram 之间的连接信息。")
async def connection(context):
""" Displays connection information between PagerMaid and Telegram. """
datacenter = await context.client(functions.help.GetNearestDcRequest())
await context.edit(
f"**连接信息** \n"
f"国家: `{datacenter.country}` \n"
f"连接到的数据中心: `{datacenter.this_dc}` \n"
f"最近的数据中心: `{datacenter.nearest_dc}`"
)
@listener(outgoing=True, command="ping",
description="计算 PagerMaid 和 Telegram 之间的延迟。")
async def ping(context):
""" Calculates latency between PagerMaid and Telegram. """
start = datetime.now()
await context.edit("Pong!")
end = datetime.now()
duration = (end - start).microseconds / 1000
await context.edit(f"Pong!|{duration}")
@listener(outgoing=True, command="topcloud",
description="生成资源占用的词云。")
async def topcloud(context):
""" Generates a word cloud of resource-hungry processes. """
await context.edit("生成图片中 . . .")
command_list = []
if not Path('/usr/bin/top').is_symlink():
output = str(await execute("top -b -n 1")).split("\n")[7:]
else:
output = str(await execute("top -b -n 1")).split("\n")[4:]
for line in output[:-1]:
line = sub(r'\s+', ' ', line).strip()
fields = line.split(" ")
try:
if fields[11].count("/") > 0:
command = fields[11].split("/")[0]
else:
command = fields[11]
cpu = float(fields[8].replace(",", "."))
mem = float(fields[9].replace(",", "."))
if command != "top":
command_list.append((command, cpu, mem))
except BaseException:
pass
command_dict = {}
for command, cpu, mem in command_list:
if command in command_dict:
command_dict[command][0] += cpu
command_dict[command][1] += mem
else:
command_dict[command] = [cpu + 1, mem + 1]
resource_dict = {}
for command, [cpu, mem] in command_dict.items():
resource_dict[command] = (cpu ** 2 + mem ** 2) ** 0.5
width, height = None, None
try:
width, height = ((popen("xrandr | grep '*'").read()).split()[0]).split("x")
width = int(width)
height = int(height)
except BaseException:
pass
if not width or not height:
width = int(config['width'])
height = int(config['height'])
background = config['background']
margin = int(config['margin'])
cloud = WordCloud(
background_color=background,
width=width - 2 * int(margin),
height=height - 2 * int(margin)
).generate_from_frequencies(resource_dict)
cloud.to_file("cloud.png")
await context.edit("上传图片中 . . .")
await context.client.send_file(
context.chat_id,
"cloud.png",
reply_to=None,
caption="正在运行的进程。"
)
remove("cloud.png")
await context.delete()
await log("生成了一张资源占用的词云。")
def unit_convert(byte):
""" Converts byte into readable formats. """
power = 2 ** 10
zero = 0
units = {
0: '',
1: 'Kb/s',
2: 'Mb/s',
3: 'Gb/s',
4: 'Tb/s'}
while byte > power:
byte /= power
zero += 1
return f"{round(byte, 2)} {units[zero]}"