PagerMaid-Pyro/pagermaid/modules/status.py
xtaodada a941308d17
Some checks failed
Docker Dev Build / docker build and publish (push) Failing after 13s
Docker Build / docker build and publish (push) Failing after 11s
🔖 Update to v1.5.0
这是一项破坏性变更,目录结构进行了重组,无核心功能变化
2024-09-28 22:01:40 +08:00

240 lines
7.6 KiB
Python

""" PagerMaid module that contains utilities related to system status. """
import re
from datetime import datetime
from getpass import getuser
from platform import uname, python_version
from shutil import disk_usage
from socket import gethostname
from subprocess import Popen, PIPE
from sys import platform
from time import time
from psutil import boot_time, virtual_memory, disk_partitions
from pyrogram import __version__
from pyrogram.enums import ChatType
from pyrogram.enums.parse_mode import ParseMode
from pyrogram.raw.functions import Ping
from pagermaid.common.status import get_bot_uptime
from pagermaid.config import Config
from pagermaid.enums import Client, Message
from pagermaid.listener import listener
from pagermaid.utils import lang, execute
from pagermaid.version import pgm_version
DCs = {
1: "149.154.175.50",
2: "149.154.167.51",
3: "149.154.175.100",
4: "149.154.167.91",
5: "91.108.56.130",
}
@listener(is_plugin=False, command="sysinfo", description=lang("sysinfo_des"))
async def sysinfo(message: Message):
"""Retrieve system information via neofetch."""
if not Config.SILENT:
message = await message.edit(lang("sysinfo_loading"))
if platform == "win32":
return await message.edit(neofetch_win(), parse_mode=ParseMode.HTML)
result = await execute("neofetch --config none --stdout")
await message.edit(f"`{result}`")
@listener(is_plugin=False, command="status", description=lang("status_des"))
async def status(message: Message):
# database
# database = lang('status_online') if redis_status() else lang('status_offline')
# uptime https://gist.github.com/borgstrom/936ca741e885a1438c374824efb038b3
uptime = await get_bot_uptime()
text = (
f"**{lang('status_hint')}** \n"
f"{lang('status_name')}: `{uname().node}` \n"
f"{lang('status_platform')}: `{platform}` \n"
f"{lang('status_release')}: `{uname().release}` \n"
f"{lang('status_python')}: `{python_version()}` \n"
f"{lang('status_pyrogram')}: `{__version__}` \n"
f"{lang('status_pgm')}: `{pgm_version}`\n"
f"{lang('status_uptime')}: `{uptime}`"
)
await message.edit(text)
@listener(is_plugin=False, command="stats", description=lang("stats_des"))
async def stats(client: Client, message: Message):
msg = await message.edit(lang("stats_loading"))
a, u, g, s, c, b = 0, 0, 0, 0, 0, 0
for dialog in await client.get_dialogs_list():
chat_type = dialog.chat.type
if chat_type == ChatType.BOT:
b += 1
elif chat_type == ChatType.PRIVATE:
u += 1
elif chat_type == ChatType.GROUP:
g += 1
elif chat_type == ChatType.SUPERGROUP:
s += 1
elif chat_type == ChatType.CHANNEL:
c += 1
a += 1
text = (
f"**{lang('stats_hint')}** \n"
f"{lang('stats_dialogs')}: `{a}` \n"
f"{lang('stats_private')}: `{u}` \n"
f"{lang('stats_group')}: `{g}` \n"
f"{lang('stats_supergroup')}: `{s}` \n"
f"{lang('stats_channel')}: `{c}` \n"
f"{lang('stats_bot')}: `{b}`"
)
await msg.edit(text)
@listener(is_plugin=False, command="pingdc", description=lang("pingdc_des"))
async def ping_dc(message: Message):
"""Ping your or other data center's IP addresses."""
data = []
print("1")
for dc in range(1, 6):
if platform == "win32":
result = await execute(f'ping -n 1 {DCs[dc]} | find "最短"')
if result := re.findall(r"= (.*?)ms", result or ""):
data.append(result[0])
else:
data.append("0")
else:
result = await execute(
f"ping -c 1 {DCs[dc]} | awk -F 'time=' "
+ "'/time=/ {print $2}' | awk '{print $1}'"
)
try:
data.append(str(float(result)))
except ValueError:
data.append("0")
await message.edit(
f"{lang('pingdc_1')}: `{data[0]}ms`\n"
f"{lang('pingdc_2')}: `{data[1]}ms`\n"
f"{lang('pingdc_3')}: `{data[2]}ms`\n"
f"{lang('pingdc_4')}: `{data[3]}ms`\n"
f"{lang('pingdc_5')}: `{data[4]}ms`"
)
@listener(is_plugin=False, command="ping", description=lang("ping_des"))
async def ping(client: Client, message: Message):
"""Calculates latency between PagerMaid and Telegram."""
start = datetime.now()
await client.invoke(Ping(ping_id=0))
end = datetime.now()
ping_duration = (end - start).microseconds / 1000
start = datetime.now()
message = await message.edit("Pong!")
end = datetime.now()
msg_duration = (end - start).microseconds / 1000
await message.edit(f"Pong!| PING: {ping_duration} | MSG: {msg_duration}")
def wmic(command: str):
"""Fetch the wmic command to cmd"""
try:
p = Popen(command.split(" "), stdout=PIPE)
except FileNotFoundError:
return r"WMIC.exe was not found... Make sure 'C:\Windows\System32\wbem' is added to PATH."
stdout, _ = p.communicate()
output = stdout.decode("gbk", "ignore")
lines = output.split("\r\r")
lines = [g.replace("\n", "").replace(" ", "") for g in lines if len(g) > 2]
return lines
def get_uptime():
"""Get the device uptime"""
delta = round(time() - boot_time())
hours, remainder = divmod(int(delta), 3600)
minutes, seconds = divmod(remainder, 60)
days, hours = divmod(hours, 24)
def include_s(text: str, num: int):
return f"{num} {text}{'' if num == 1 else 's'}"
d = include_s("day", days)
h = include_s("hour", hours)
m = include_s("minute", minutes)
s = include_s("second", seconds)
if days:
output = f"{d}, {h}, {m} and {s}"
elif hours:
output = f"{h}, {m} and {s}"
elif minutes:
output = f"{m} and {s}"
else:
output = s
return output
def readable(num, suffix="B"):
"""Convert Bytes into human-readable formats"""
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, "Yi", suffix)
def get_ram():
"""Get RAM used/free/total"""
ram = virtual_memory()
used = readable(ram.used)
total = readable(ram.total)
percent_used = round(ram.used / ram.total * 100, 2)
return f"{used} / {total} ({percent_used}%)"
def partitions():
"""Find the disk partitions on current OS"""
parts = disk_partitions()
listparts = []
for g in parts:
try:
total, used, _ = disk_usage(g.device)
percent_used = round(used / total * 100, 2)
listparts.append(
f" {g.device[:2]} {readable(used)} / {readable(total)} ({percent_used}%)"
)
except PermissionError:
continue
return listparts
def neofetch_win():
user_name = getuser()
host_name = gethostname()
os = wmic("wmic os get Caption")[-1].replace("Microsoft ", "")
uptime = get_uptime()
mboard_name = wmic("wmic baseboard get Manufacturer")
mboard_module = wmic("wmic baseboard get product")
try:
mboard = f"{mboard_name[-1]} ({mboard_module[-1]})"
except IndexError:
mboard = "Unknown..."
cpu = wmic("wmic cpu get name")[-1]
gpu = wmic("wmic path win32_VideoController get name")
gpu = [f" {g.strip()}" for g in gpu[1:]][0].strip()
ram = get_ram()
disks = "\n".join(partitions())
return (
f"<code>{user_name}@{host_name}\n---------\nOS: {os}\nUptime: {uptime}\n"
f"Motherboard: {mboard}\nCPU: {cpu}\nGPU: {gpu}\nMemory: {ram}\n"
f"Disk:\n{disks}</code>"
)