213 lines
7.0 KiB
Python
213 lines
7.0 KiB
Python
"""
|
|
Video + Music Stream Telegram Bot
|
|
Copyright (c) 2022-present levina=lab <https://github.com/levina-lab>
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but without any warranty; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <https://www.gnu.org/licenses/licenses.html>
|
|
"""
|
|
|
|
|
|
import re
|
|
import sys
|
|
import subprocess
|
|
import traceback
|
|
|
|
from time import time
|
|
from io import StringIO
|
|
from inspect import getfullargspec
|
|
|
|
from config import BOT_USERNAME as bname
|
|
from driver.core import bot
|
|
from driver.queues import QUEUE
|
|
from driver.filters import command
|
|
from driver.database.dbchat import remove_served_chat
|
|
from driver.decorators import bot_creator, sudo_users_only, errors
|
|
from driver.utils import remove_if_exists
|
|
|
|
from pyrogram import Client, filters
|
|
from pyrogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup
|
|
|
|
|
|
async def aexec(code, client, message):
|
|
exec(
|
|
"async def __aexec(client, message): "
|
|
+ "".join(f"\n {a}" for a in code.split("\n"))
|
|
)
|
|
return await locals()["__aexec"](client, message)
|
|
|
|
async def edit_or_reply(msg: Message, **kwargs):
|
|
func = msg.edit_text if msg.from_user.is_self else msg.reply
|
|
spec = getfullargspec(func.__wrapped__).args
|
|
await func(**{k: v for k, v in kwargs.items() if k in spec})
|
|
|
|
|
|
@Client.on_message(command(["eval", f"eval{bname}"]) & ~filters.edited)
|
|
@sudo_users_only
|
|
async def executor(client, message):
|
|
if len(message.command) < 2:
|
|
return await edit_or_reply(message, text="» Give a command to execute")
|
|
try:
|
|
cmd = message.text.split(" ", maxsplit=1)[1]
|
|
except IndexError:
|
|
return await message.delete()
|
|
t1 = time()
|
|
old_stderr = sys.stderr
|
|
old_stdout = sys.stdout
|
|
redirected_output = sys.stdout = StringIO()
|
|
redirected_error = sys.stderr = StringIO()
|
|
stdout, stderr, exc = None, None, None
|
|
try:
|
|
await aexec(cmd, client, message)
|
|
except Exception:
|
|
exc = traceback.format_exc()
|
|
stdout = redirected_output.getvalue()
|
|
stderr = redirected_error.getvalue()
|
|
sys.stdout = old_stdout
|
|
sys.stderr = old_stderr
|
|
evaluation = ""
|
|
if exc:
|
|
evaluation = exc
|
|
elif stderr:
|
|
evaluation = stderr
|
|
elif stdout:
|
|
evaluation = stdout
|
|
else:
|
|
evaluation = "SUCCESS"
|
|
final_output = f"`OUTPUT:`\n\n```{evaluation.strip()}```"
|
|
if len(final_output) > 4096:
|
|
filename = "output.txt"
|
|
with open(filename, "w+", encoding="utf8") as out_file:
|
|
out_file.write(str(evaluation.strip()))
|
|
t2 = time()
|
|
keyboard = InlineKeyboardMarkup(
|
|
[
|
|
[
|
|
InlineKeyboardButton(
|
|
text="⏳", callback_data=f"runtime {t2-t1} seconds"
|
|
)
|
|
]
|
|
]
|
|
)
|
|
await message.reply_document(
|
|
document=filename,
|
|
caption=f"`INPUT:`\n`{cmd[0:980]}`\n\n`OUTPUT:`\n`attached document`",
|
|
quote=False,
|
|
reply_markup=keyboard,
|
|
)
|
|
await message.delete()
|
|
remove_if_exists(filename)
|
|
else:
|
|
t2 = time()
|
|
keyboard = InlineKeyboardMarkup(
|
|
[
|
|
[
|
|
InlineKeyboardButton(
|
|
text="⏳",
|
|
callback_data=f"runtime {round(t2-t1, 3)} seconds",
|
|
)
|
|
]
|
|
]
|
|
)
|
|
await edit_or_reply(message, text=final_output, reply_markup=keyboard)
|
|
|
|
|
|
@Client.on_callback_query(filters.regex(r"runtime"))
|
|
async def runtime_func_cq(_, cq):
|
|
runtime = cq.data.split(None, 1)[1]
|
|
await cq.answer(runtime, show_alert=True)
|
|
|
|
|
|
@Client.on_message(command(["sh", f"sh{bname}"]) & ~filters.edited)
|
|
@sudo_users_only
|
|
async def shellrunner(client, message):
|
|
if len(message.command) < 2:
|
|
return await edit_or_reply(message, text="**usage:**\n\n» /sh echo hello world")
|
|
text = message.text.split(None, 1)[1]
|
|
if "\n" in text:
|
|
code = text.split("\n")
|
|
output = ""
|
|
for x in code:
|
|
shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", x)
|
|
try:
|
|
process = subprocess.Popen(
|
|
shell,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
except Exception as err:
|
|
print(err)
|
|
await edit_or_reply(message, text=f"`ERROR:`\n\n```{err}```")
|
|
output += f"**{code}**\n"
|
|
output += process.stdout.read()[:-1].decode("utf-8")
|
|
output += "\n"
|
|
else:
|
|
shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", text)
|
|
for a in range(len(shell)):
|
|
shell[a] = shell[a].replace('"', "")
|
|
try:
|
|
process = subprocess.Popen(
|
|
shell,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
except Exception as err:
|
|
print(err)
|
|
exc_type, exc_obj, exc_tb = sys.exc_info()
|
|
errors = traceback.format_exception(
|
|
etype=exc_type,
|
|
value=exc_obj,
|
|
tb=exc_tb,
|
|
)
|
|
return await edit_or_reply(
|
|
message, text=f"`ERROR:`\n\n```{''.join(errors)}```"
|
|
)
|
|
output = process.stdout.read()[:-1].decode("utf-8")
|
|
if str(output) == "\n":
|
|
output = None
|
|
if output:
|
|
if len(output) > 4096:
|
|
with open("output.txt", "w+") as file:
|
|
file.write(output)
|
|
await bot.send_document(
|
|
message.chat.id,
|
|
"output.txt",
|
|
reply_to_message_id=message.message_id,
|
|
caption="`OUTPUT`",
|
|
)
|
|
return remove_if_exists("output.txt")
|
|
await edit_or_reply(message, text=f"`OUTPUT:`\n\n```{output}```")
|
|
else:
|
|
await edit_or_reply(message, text="`OUTPUT:`\n\n`no output`")
|
|
|
|
|
|
@Client.on_message(command(["leavebot", f"leavebot{bname}"]) & ~filters.edited)
|
|
@bot_creator
|
|
async def bot_leave_group(_, message):
|
|
if len(message.command) != 2:
|
|
await message.reply_text(
|
|
"**usage:**\n\n» /leavebot (`chat_id`)"
|
|
)
|
|
return
|
|
chat = message.text.split(None, 2)[1]
|
|
if chat in QUEUE:
|
|
await remove_active_chat(chat)
|
|
return
|
|
try:
|
|
await bot.leave_chat(chat)
|
|
await user.leave_chat(chat)
|
|
await remove_served_chat(chat)
|
|
except Exception as e:
|
|
await message.reply_text(f"❌ procces failed\n\nreason: `{e}`")
|
|
return
|
|
await message.reply_text(f"✅ Bot successfully left from the Group:\n\n💭 » `{chat}`")
|