""" PagerMaid module that contains utilities related to system status. """ from os import remove, popen from datetime import datetime from speedtest import Speedtest from telethon import functions from platform import python_version, uname from wordcloud import WordCloud from telethon import version as telethon_version from sys import platform from re import sub from pathlib import Path from pagermaid import log, config, redis_status from pagermaid.utils import execute, upload_attachment from pagermaid.listener import listener @listener(outgoing=True, command="sysinfo", description="通过 neofetch 检索系统信息。") async def sysinfo(context): """ Retrieve system information via neofetch. """ await context.edit("加载系统信息中 . . .") result = await execute("neofetch --config none --stdout") await context.edit(f"`{result}`") @listener(outgoing=True, command="fortune", description="读取 fortune cookies 信息。") async def fortune(context): """ Reads a fortune cookie. """ result = await execute("fortune") if result == "/bin/sh: fortune: command not found": await context.edit("`出错了呜呜呜 ~ 此系统上没有 fortune cookies`") return await context.edit(result) @listener(outgoing=True, command="fbcon", description="拍摄当前绑定的帧缓冲控制台的屏幕截图。") async def tty(context): """ Screenshots a TTY and prints it. """ await context.edit("拍摄帧缓冲控制台的屏幕截图中 . . .") reply_id = context.message.reply_to_msg_id result = await execute("fbdump | magick - image.png") if result == "/bin/sh: fbdump: command not found": await context.edit("出错了呜呜呜 ~ 此系统上没有 fbdump") remove("image.png") return if result == "/bin/sh: convert: command not found": await context.edit("出错了呜呜呜 ~ ImageMagick 在该系统上不存在。") remove("image.png") return if result == "Failed to open /dev/fb0: Permission denied": await context.edit("出错了呜呜呜 ~ 运行 PagerMaid 的用户不在视频组中。") remove("image.png") return if not await upload_attachment("image.png", context.chat_id, reply_id, caption="绑定的帧缓冲区的屏幕截图。", preview=False, document=False): await context.edit("出错了呜呜呜 ~ 由于发生意外错误,导致文件生成失败。") return await context.delete() remove("image.png") await log("Screenshot of binded framebuffer console taken.") @listener(outgoing=True, command="status", description="输出 PagerMaid 的状态。") async def status(context): database = "Connected" if redis_status() else "Disconnected" await context.edit( f"**PagerMaid 状态** \n" f"主机名: `{uname().node}` \n" f"主机平台: `{platform}` \n" f"Kernel 版本: `{uname().release}` \n" f"Python 版本: `{python_version()}` \n" f"Library 版本: `{telethon_version.__version__}` \n" f"数据库状态: `{'Connected' if redis_status() else 'Disconnected'}`" ) @listener(outgoing=True, command="speedtest", description="执行 speedtest 脚本并输出您的互联网速度。") async def speedtest(context): """ Tests internet speed using speedtest. """ await context.edit("执行测试脚本 . . .") test = Speedtest() test.get_best_server() test.download() test.upload() test.results.share() result = test.results.dict() await context.edit( f"**Speedtest** \n" f"Upload: `{unit_convert(result['upload'])}` \n" f"Download: `{unit_convert(result['download'])}` \n" f"Latency: `{result['ping']}` \n" f"Timestamp: `{result['timestamp']}`" ) @listener(outgoing=True, command="connection", description="显示 PagerMaid 和 Telegram 之间的连接信息。") async def connection(context): """ Displays connection information between PagerMaid and Telegram. """ datacenter = await context.client(functions.help.GetNearestDcRequest()) await context.edit( f"**连接信息** \n" f"国家: `{datacenter.country}` \n" f"连接到的数据中心: `{datacenter.this_dc}` \n" f"最近的数据中心: `{datacenter.nearest_dc}`" ) @listener(outgoing=True, command="ping", description="计算 PagerMaid 和 Telegram 之间的延迟。") async def ping(context): """ Calculates latency between PagerMaid and Telegram. """ start = datetime.now() await context.edit("Pong!") end = datetime.now() duration = (end - start).microseconds / 1000 await context.edit(f"Pong!|{duration}") @listener(outgoing=True, command="topcloud", description="生成资源占用的词云。") async def topcloud(context): """ Generates a word cloud of resource-hungry processes. """ await context.edit("生成图片中 . . .") command_list = [] if not Path('/usr/bin/top').is_symlink(): output = str(await execute("top -b -n 1")).split("\n")[7:] else: output = str(await execute("top -b -n 1")).split("\n")[4:] for line in output[:-1]: line = sub(r'\s+', ' ', line).strip() fields = line.split(" ") try: if fields[11].count("/") > 0: command = fields[11].split("/")[0] else: command = fields[11] cpu = float(fields[8].replace(",", ".")) mem = float(fields[9].replace(",", ".")) if command != "top": command_list.append((command, cpu, mem)) except BaseException: pass command_dict = {} for command, cpu, mem in command_list: if command in command_dict: command_dict[command][0] += cpu command_dict[command][1] += mem else: command_dict[command] = [cpu + 1, mem + 1] resource_dict = {} for command, [cpu, mem] in command_dict.items(): resource_dict[command] = (cpu ** 2 + mem ** 2) ** 0.5 width, height = None, None try: width, height = ((popen("xrandr | grep '*'").read()).split()[0]).split("x") width = int(width) height = int(height) except BaseException: pass if not width or not height: width = int(config['width']) height = int(config['height']) background = config['background'] margin = int(config['margin']) cloud = WordCloud( background_color=background, width=width - 2 * int(margin), height=height - 2 * int(margin) ).generate_from_frequencies(resource_dict) cloud.to_file("cloud.png") await context.edit("上传图片中 . . .") await context.client.send_file( context.chat_id, "cloud.png", reply_to=None, caption="正在运行的进程。" ) remove("cloud.png") await context.delete() await log("生成了一张资源占用的词云。") def unit_convert(byte): """ Converts byte into readable formats. """ power = 2 ** 10 zero = 0 units = { 0: '', 1: 'Kb/s', 2: 'Mb/s', 3: 'Gb/s', 4: 'Tb/s'} while byte > power: byte /= power zero += 1 return f"{round(byte, 2)} {units[zero]}"