import os import re import sys import shutil import subprocess import traceback from time import time from io import StringIO from sys import version as pyver from inspect import getfullargspec from config import BOT_USERNAME as bname from driver.veez import bot from driver.filters import command from pyrogram import Client, filters from driver.decorators import sudo_users_only, errors from pyrogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup async def aexec(code, client, message): exec( "async def __aexec(client, message): " + "".join(f"\n {a}" for a in code.split("\n")) ) return await locals()["__aexec"](client, message) async def edit_or_reply(msg: Message, **kwargs): func = msg.edit_text if msg.from_user.is_self else msg.reply spec = getfullargspec(func.__wrapped__).args await func(**{k: v for k, v in kwargs.items() if k in spec}) @Client.on_message(command(["eval", f"eval{bname}"]) & ~filters.edited) @sudo_users_only async def executor(client, message): if len(message.command) < 2: return await edit_or_reply(message, text="» Give a command to execute") try: cmd = message.text.split(" ", maxsplit=1)[1] except IndexError: return await message.delete() t1 = time() old_stderr = sys.stderr old_stdout = sys.stdout redirected_output = sys.stdout = StringIO() redirected_error = sys.stderr = StringIO() stdout, stderr, exc = None, None, None try: await aexec(cmd, client, message) except Exception: exc = traceback.format_exc() stdout = redirected_output.getvalue() stderr = redirected_error.getvalue() sys.stdout = old_stdout sys.stderr = old_stderr evaluation = "" if exc: evaluation = exc elif stderr: evaluation = stderr elif stdout: evaluation = stdout else: evaluation = "SUCCESS" final_output = f"`OUTPUT:`\n\n```{evaluation.strip()}```" if len(final_output) > 4096: filename = "output.txt" with open(filename, "w+", encoding="utf8") as out_file: out_file.write(str(evaluation.strip())) t2 = time() keyboard = InlineKeyboardMarkup( [ [ InlineKeyboardButton( text="⏳", callback_data=f"runtime {t2-t1} Seconds" ) ] ] ) await message.reply_document( document=filename, caption=f"`INPUT:`\n`{cmd[0:980]}`\n\n`OUTPUT:`\n`attached document`", quote=False, reply_markup=keyboard, ) await message.delete() os.remove(filename) else: t2 = time() keyboard = InlineKeyboardMarkup( [ [ InlineKeyboardButton( text="⏳", callback_data=f"runtime {round(t2-t1, 3)} Seconds", ) ] ] ) await edit_or_reply(message, text=final_output, reply_markup=keyboard) @Client.on_callback_query(filters.regex(r"runtime")) async def runtime_func_cq(_, cq): runtime = cq.data.split(None, 1)[1] await cq.answer(runtime, show_alert=True) @Client.on_message(command(["sh", f"sh{bname}"]) & ~filters.edited) @sudo_users_only async def shellrunner(client, message): if len(message.command) < 2: return await edit_or_reply(message, text="**usage:**\n\n» /sh git pull") text = message.text.split(None, 1)[1] if "\n" in text: code = text.split("\n") output = "" for x in code: shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", x) try: process = subprocess.Popen( shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) except Exception as err: print(err) await edit_or_reply(message, text=f"`ERROR:`\n```{err}```") output += f"**{code}**\n" output += process.stdout.read()[:-1].decode("utf-8") output += "\n" else: shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", text) for a in range(len(shell)): shell[a] = shell[a].replace('"', "") try: process = subprocess.Popen( shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) except Exception as err: print(err) exc_type, exc_obj, exc_tb = sys.exc_info() errors = traceback.format_exception( etype=exc_type, value=exc_obj, tb=exc_tb, ) return await edit_or_reply( message, text=f"`ERROR:`\n```{''.join(errors)}```" ) output = process.stdout.read()[:-1].decode("utf-8") if str(output) == "\n": output = None if output: if len(output) > 4096: with open("output.txt", "w+") as file: file.write(output) await bot.send_document( message.chat.id, "output.txt", reply_to_message_id=message.message_id, caption="`OUTPUT`", ) return os.remove("output.txt") await edit_or_reply(message, text=f"`OUTPUT:`\n```{output}```") else: await edit_or_reply(message, text="`OUTPUT:`\n`no output`") @Client.on_message(command(["leavebot", f"leavebot{bname}"]) & ~filters.edited) @sudo_users_only async def bot_leave_group(_, message): if len(message.command) != 2: await message.reply_text( "**usage:**\n\n» /leavebot [chat id]" ) return chat = message.text.split(None, 2)[1] try: await bot.leave_chat(chat) except Exception as e: await message.reply_text(f"❌ procces failed\n\nreason: `{e}`") print(e) return await message.reply_text(f"✅ Bot successfully left from the Group:\n\n» `{chat}`")