From efd76e142476a55fa0eecfaf611b940aaa9b06a9 Mon Sep 17 00:00:00 2001 From: levina <82658782+levina-lab@users.noreply.github.com> Date: Sun, 30 Jan 2022 07:32:42 +0700 Subject: [PATCH] create developer plugin --- program/developer.py | 170 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 program/developer.py diff --git a/program/developer.py b/program/developer.py new file mode 100644 index 0000000..03ff098 --- /dev/null +++ b/program/developer.py @@ -0,0 +1,170 @@ +import os +import re +import sys +import shutil +import subprocess +import traceback + +from time import time +from io import StringIO +from sys import version as pyver +from inspect import getfullargspec + +from config import BOT_USERNAME as bname +from driver.filters import command, other_filters +from pyrogram import Client, filters +from driver.decorators import sudo_users_only, error +from pyrogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup + + +async def aexec(code, client, message): + exec( + "async def __aexec(client, message): " + + "".join(f"\n {a}" for a in code.split("\n")) + ) + return await locals()["__aexec"](client, message) + + +async def edit_or_reply(msg: Message, **kwargs): + func = msg.edit_text if msg.from_user.is_self else msg.reply + spec = getfullargspec(func.__wrapped__).args + await func(**{k: v for k, v in kwargs.items() if k in spec}) + + +@Client.on_message(command(["eval", f"eval{bname}"]) & other_filters) +@sudo_users_only +async def executor(client, message): + if len(message.command) < 2: + return await edit_or_reply(message, text="» Give a command to execute") + try: + cmd = message.text.split(" ", maxsplit=1)[1] + except IndexError: + return await message.delete() + t1 = time() + old_stderr = sys.stderr + old_stdout = sys.stdout + redirected_output = sys.stdout = StringIO() + redirected_error = sys.stderr = StringIO() + stdout, stderr, exc = None, None, None + try: + await aexec(cmd, client, message) + except Exception: + exc = traceback.format_exc() + stdout = redirected_output.getvalue() + stderr = redirected_error.getvalue() + sys.stdout = old_stdout + sys.stderr = old_stderr + evaluation = "" + if exc: + evaluation = exc + elif stderr: + evaluation = stderr + elif stdout: + evaluation = stdout + else: + evaluation = "SUCCESS" + final_output = f"`OUTPUT:`\n\n```{evaluation.strip()}```" + if len(final_output) > 4096: + filename = "output.txt" + with open(filename, "w+", encoding="utf8") as out_file: + out_file.write(str(evaluation.strip())) + t2 = time() + keyboard = InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton( + text="⏳", callback_data=f"runtime {t2-t1} Seconds" + ) + ] + ] + ) + await message.reply_document( + document=filename, + caption=f"`INPUT:`\n`{cmd[0:980]}`\n\n`OUTPUT:`\n`attached document`", + quote=False, + reply_markup=keyboard, + ) + await message.delete() + os.remove(filename) + else: + t2 = time() + keyboard = InlineKeyboardMarkup( + [ + [ + InlineKeyboardButton( + text="⏳", + callback_data=f"runtime {round(t2-t1, 3)} Seconds", + ) + ] + ] + ) + await edit_or_reply(message, text=final_output, reply_markup=keyboard) + + +@Client.on_callback_query(filters.regex(r"runtime")) +async def runtime_func_cq(_, cq): + runtime = cq.data.split(None, 1)[1] + await cq.answer(runtime, show_alert=True) + + +@Client.on_message(command(["sh", f"sh{bname}"]) & other_filters) +@sudo_users_only +async def shellrunner(client, message): + if len(message.command) < 2: + return await edit_or_reply(message, text="**usage:**\n\n» /sh git pull") + text = message.text.split(None, 1)[1] + if "\n" in text: + code = text.split("\n") + output = "" + for x in code: + shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", x) + try: + process = subprocess.Popen( + shell, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + except Exception as err: + print(err) + await edit_or_reply(message, text=f"`ERROR:`\n```{err}```") + output += f"**{code}**\n" + output += process.stdout.read()[:-1].decode("utf-8") + output += "\n" + else: + shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", text) + for a in range(len(shell)): + shell[a] = shell[a].replace('"', "") + try: + process = subprocess.Popen( + shell, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + except Exception as err: + print(err) + exc_type, exc_obj, exc_tb = sys.exc_info() + errors = traceback.format_exception( + etype=exc_type, + value=exc_obj, + tb=exc_tb, + ) + return await edit_or_reply( + message, text=f"`ERROR:`\n```{''.join(errors)}```" + ) + output = process.stdout.read()[:-1].decode("utf-8") + if str(output) == "\n": + output = None + if output: + if len(output) > 4096: + with open("output.txt", "w+") as file: + file.write(output) + await app.send_document( + message.chat.id, + "output.txt", + reply_to_message_id=message.message_id, + caption="`OUTPUT`", + ) + return os.remove("output.txt") + await edit_or_reply(message, text=f"`OUTPUT:`\n```{output}```") + else: + await edit_or_reply(message, text="`OUTPUT:`\n`no output`")