PagerMaid-Pyro/pagermaid/modules/system.py

132 lines
3.9 KiB
Python
Raw Normal View History

2022-05-23 12:40:30 +00:00
import io
import sys
import traceback
2022-08-02 04:57:08 +00:00
from os.path import exists, sep
2022-05-23 12:40:30 +00:00
from sys import exit
from platform import node
from getpass import getuser
from pyrogram.enums import ParseMode
2022-05-23 12:40:30 +00:00
from pagermaid.listener import listener
2022-08-02 04:57:08 +00:00
from pagermaid.enums import Message
from pagermaid.services import bot
from pagermaid.utils import attach_log, execute, lang, upload_attachment
2022-05-23 12:40:30 +00:00
@listener(is_plugin=False, command="sh",
need_admin=True,
description=lang('sh_des'),
parameters=lang('sh_parameters'))
2022-06-20 13:55:14 +00:00
async def sh(message: Message):
2022-05-23 12:40:30 +00:00
""" Use the command-line from Telegram. """
user = getuser()
2022-06-08 03:18:21 +00:00
command = message.arguments
2022-05-23 12:40:30 +00:00
hostname = node()
if not command:
await message.edit(lang('arg_error'))
return
message = await message.edit(
2022-06-20 13:55:14 +00:00
f"`{user}`@{hostname} ~"
f"\n> `$` {command}"
)
2022-05-23 12:40:30 +00:00
result = await execute(command)
if result:
if len(result) > 4096:
2022-07-08 14:26:36 +00:00
await attach_log(result, message.chat.id, "output.log", message.id)
2022-05-23 12:40:30 +00:00
return
await message.edit(
2022-06-20 13:55:14 +00:00
f"`{user}`@{hostname} ~"
f"\n> `#` {command}"
f"\n`{result}`"
)
2022-05-23 12:40:30 +00:00
else:
return
@listener(is_plugin=False, command="restart",
need_admin=True,
description=lang('restart_des'))
2022-06-20 13:55:14 +00:00
async def restart(message: Message):
2022-05-23 12:40:30 +00:00
""" To re-execute PagerMaid. """
if not message.text[0].isalpha():
await message.edit(lang('restart_log'))
exit(0)
@listener(is_plugin=False, command="eval",
need_admin=True,
description=lang('eval_des'),
parameters=lang('eval_parameters'))
2022-06-20 13:55:14 +00:00
async def sh_eval(message: Message):
2022-05-23 12:40:30 +00:00
""" Run python commands from Telegram. """
dev_mode = exists(f"data{sep}dev")
2022-05-23 12:40:30 +00:00
try:
assert dev_mode
2022-05-23 12:40:30 +00:00
cmd = message.text.split(" ", maxsplit=1)[1]
except (IndexError, AssertionError):
2022-06-08 03:18:21 +00:00
return await message.edit(lang('eval_need_dev'))
2022-05-23 12:40:30 +00:00
old_stderr = sys.stderr
old_stdout = sys.stdout
redirected_output = sys.stdout = io.StringIO()
redirected_error = sys.stderr = io.StringIO()
stdout, stderr, exc = None, None, None
try:
await aexec(cmd, message, bot)
except Exception: # noqa
exc = traceback.format_exc()
stdout = redirected_output.getvalue()
stderr = redirected_error.getvalue()
sys.stdout = old_stdout
sys.stderr = old_stderr
if exc:
evaluation = exc
elif stderr:
evaluation = stderr
elif stdout:
evaluation = stdout
else:
evaluation = "Success"
final_output = f"**>>>** `{cmd}` \n`{evaluation}`"
2022-05-23 12:40:30 +00:00
if len(final_output) > 4096:
message = await message.edit(f"**>>>** `{cmd}`", parse_mode=ParseMode.MARKDOWN)
2022-07-08 14:26:36 +00:00
await attach_log(evaluation, message.chat.id, "output.log", message.id)
2022-05-23 12:40:30 +00:00
else:
await message.edit(final_output)
2022-08-02 04:57:08 +00:00
@listener(is_plugin=False, command="send_log",
need_admin=True,
description=lang("send_log_des"))
async def send_log(message: Message):
""" Send log to a chat. """
if not exists("pagermaid.log.txt"):
return await message.edit(lang("send_log_not_found"))
await upload_attachment("pagermaid.log.txt",
message.chat.id,
message.reply_to_message_id or message.reply_to_top_message_id,
2022-08-02 04:57:08 +00:00
thumb=f"pagermaid{sep}assets{sep}logo.jpg",
caption=lang("send_log_caption"))
await message.safe_delete()
2022-05-23 12:40:30 +00:00
async def aexec(code, event, client):
exec(
2022-06-20 13:55:14 +00:00
(
(
("async def __aexec(e, client): " + "\n msg = message = e")
+ "\n reply = message.reply_to_message"
)
+ "\n chat = e.chat"
)
+ "".join(f"\n {x}" for x in code.split("\n"))
2022-05-23 12:40:30 +00:00
)
return await locals()["__aexec"](event, client)