MibooGram/plugins/system/status.py
2023-12-19 00:51:36 +08:00

88 lines
2.9 KiB
Python

import asyncio
import os
from platform import python_version
from time import time
from typing import TYPE_CHECKING
import psutil
from telegram import __version__
from git import Repo
from git.exc import GitCommandError, InvalidGitRepositoryError, NoSuchPathError
from core.plugin import Plugin, handler
from utils.log import logger
if TYPE_CHECKING:
from telegram import Update
from telegram.ext import ContextTypes
class Status(Plugin):
def __init__(self):
self.pid = os.getpid()
self.time_form = "%m/%d %H:%M"
@staticmethod
def get_git_hash() -> str:
try:
repo = Repo()
except (InvalidGitRepositoryError, NoSuchPathError, GitCommandError):
return "非 Git 仓库"
return repo.head.commit.hexsha[:8]
@handler.command(command="status", block=False, admin=True)
async def send_log(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE"):
user = update.effective_user
logger.info("用户 %s[%s] status 命令请求", user.full_name, user.id)
message = update.effective_message
current_process = psutil.Process(self.pid)
memory = psutil.virtual_memory()
total_memory = memory.total
used_memory = memory.used
psutil.cpu_percent()
current_process.cpu_percent()
await asyncio.sleep(1)
cpu_percent = psutil.cpu_percent()
process_cpu_use = current_process.cpu_percent()
process_use = current_process.memory_info()
start_time = current_process.create_time()
memory_text = (
f"{total_memory / (1024 * 1024 * 1024):.2f}GB/"
f"{used_memory / (1024 * 1024 * 1024):.2f}GB/"
f"{process_use.rss / (1024 * 1024 * 1024):.2f}GB"
)
text = (
"PaiGram 运行状态\n"
f"Python 版本: `{python_version()}` \n"
f"Telegram 版本: `{__version__}` \n"
f"GramBot 版本: `{self.get_git_hash()}` \n"
f"CPU使用率: `{cpu_percent}%/{process_cpu_use}%` \n"
f"当前使用的内存: `{memory_text}` \n"
f"运行时间: `{self.get_bot_uptime(start_time)}` \n"
)
await message.reply_markdown_v2(text)
def get_bot_uptime(self, start_time: float) -> str:
uptime_sec = time() - start_time
return self.human_time_duration(int(uptime_sec), self.time_form)
@staticmethod
def human_time_duration(seconds: int, time_form: str) -> str:
parts = {}
time_units = (
("%m", 60 * 60 * 24 * 30),
("%d", 60 * 60 * 24),
("%H", 60 * 60),
("%M", 60),
("%S", 1),
)
for unit, div in time_units:
amount, seconds = divmod(int(seconds), div)
parts[unit] = str(amount)
for key, value in parts.items():
time_form = time_form.replace(key, value)
return time_form