video-stream/program/developer.py

171 lines
5.4 KiB
Python
Raw Normal View History

2022-01-30 00:32:42 +00:00
import os
import re
import sys
import shutil
import subprocess
import traceback
from time import time
from io import StringIO
from sys import version as pyver
from inspect import getfullargspec
from config import BOT_USERNAME as bname
from driver.filters import command, other_filters
from pyrogram import Client, filters
from driver.decorators import sudo_users_only, error
from pyrogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup
async def aexec(code, client, message):
exec(
"async def __aexec(client, message): "
+ "".join(f"\n {a}" for a in code.split("\n"))
)
return await locals()["__aexec"](client, message)
async def edit_or_reply(msg: Message, **kwargs):
func = msg.edit_text if msg.from_user.is_self else msg.reply
spec = getfullargspec(func.__wrapped__).args
await func(**{k: v for k, v in kwargs.items() if k in spec})
@Client.on_message(command(["eval", f"eval{bname}"]) & other_filters)
@sudo_users_only
async def executor(client, message):
if len(message.command) < 2:
return await edit_or_reply(message, text="» Give a command to execute")
try:
cmd = message.text.split(" ", maxsplit=1)[1]
except IndexError:
return await message.delete()
t1 = time()
old_stderr = sys.stderr
old_stdout = sys.stdout
redirected_output = sys.stdout = StringIO()
redirected_error = sys.stderr = StringIO()
stdout, stderr, exc = None, None, None
try:
await aexec(cmd, client, message)
except Exception:
exc = traceback.format_exc()
stdout = redirected_output.getvalue()
stderr = redirected_error.getvalue()
sys.stdout = old_stdout
sys.stderr = old_stderr
evaluation = ""
if exc:
evaluation = exc
elif stderr:
evaluation = stderr
elif stdout:
evaluation = stdout
else:
evaluation = "SUCCESS"
final_output = f"`OUTPUT:`\n\n```{evaluation.strip()}```"
if len(final_output) > 4096:
filename = "output.txt"
with open(filename, "w+", encoding="utf8") as out_file:
out_file.write(str(evaluation.strip()))
t2 = time()
keyboard = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="", callback_data=f"runtime {t2-t1} Seconds"
)
]
]
)
await message.reply_document(
document=filename,
caption=f"`INPUT:`\n`{cmd[0:980]}`\n\n`OUTPUT:`\n`attached document`",
quote=False,
reply_markup=keyboard,
)
await message.delete()
os.remove(filename)
else:
t2 = time()
keyboard = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="",
callback_data=f"runtime {round(t2-t1, 3)} Seconds",
)
]
]
)
await edit_or_reply(message, text=final_output, reply_markup=keyboard)
@Client.on_callback_query(filters.regex(r"runtime"))
async def runtime_func_cq(_, cq):
runtime = cq.data.split(None, 1)[1]
await cq.answer(runtime, show_alert=True)
@Client.on_message(command(["sh", f"sh{bname}"]) & other_filters)
@sudo_users_only
async def shellrunner(client, message):
if len(message.command) < 2:
return await edit_or_reply(message, text="**usage:**\n\n» /sh git pull")
text = message.text.split(None, 1)[1]
if "\n" in text:
code = text.split("\n")
output = ""
for x in code:
shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", x)
try:
process = subprocess.Popen(
shell,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except Exception as err:
print(err)
await edit_or_reply(message, text=f"`ERROR:`\n```{err}```")
output += f"**{code}**\n"
output += process.stdout.read()[:-1].decode("utf-8")
output += "\n"
else:
shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", text)
for a in range(len(shell)):
shell[a] = shell[a].replace('"', "")
try:
process = subprocess.Popen(
shell,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except Exception as err:
print(err)
exc_type, exc_obj, exc_tb = sys.exc_info()
errors = traceback.format_exception(
etype=exc_type,
value=exc_obj,
tb=exc_tb,
)
return await edit_or_reply(
message, text=f"`ERROR:`\n```{''.join(errors)}```"
)
output = process.stdout.read()[:-1].decode("utf-8")
if str(output) == "\n":
output = None
if output:
if len(output) > 4096:
with open("output.txt", "w+") as file:
file.write(output)
await app.send_document(
message.chat.id,
"output.txt",
reply_to_message_id=message.message_id,
caption="`OUTPUT`",
)
return os.remove("output.txt")
await edit_or_reply(message, text=f"`OUTPUT:`\n```{output}```")
else:
await edit_or_reply(message, text="`OUTPUT:`\n`no output`")