video-stream/program/admins.py
2021-11-01 15:13:31 +07:00

288 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Callable
from cache.admins import admins
from driver.veez import call_py
from pyrogram import Client, filters
from driver.decorators import authorized_users_only
from driver.filters import command, other_filters
from driver.queues import QUEUE, clear_queue
from driver.utils import skip_current_song, skip_item
from config import BOT_USERNAME, GROUP_SUPPORT, IMG_3, UPDATES_CHANNEL
from pyrogram.types import (
CallbackQuery,
InlineKeyboardButton,
InlineKeyboardMarkup,
Message,
)
def admin_only(func: Callable) -> Callable:
async def decorator(client, cb):
author = admins.get(cb.message.chat.id)
if cb.from_user.id in author:
return await func(client, cb)
else:
await cb.answer("💡 only admin can tap this button !", show_alert=True)
return
return decorator
bttn = InlineKeyboardMarkup(
[[InlineKeyboardButton("🔙 Go Back", callback_data="cbmenu")]]
)
bcl = InlineKeyboardMarkup(
[[InlineKeyboardButton("🗑 Close", callback_data="cls")]]
)
@Client.on_message(command(["reload", f"reload@{BOT_USERNAME}"]) & other_filters)
@authorized_users_only
async def update_admin(client, message):
global admins
new_admins = []
new_ads = await client.get_chat_members(message.chat.id, filter="administrators")
for u in new_ads:
new_admins.append(u.user.id)
admins[message.chat.id] = new_admins
await message.reply_text(
"✅ Bot **reloaded correctly !**\n✅ **Admin list** has been **updated !**"
)
@Client.on_message(command(["skip", f"skip@{BOT_USERNAME}", "vskip"]) & other_filters)
@authorized_users_only
async def skip(client, m: Message):
keyboard = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="• Mᴇɴ", callback_data="cbmenu"
),
InlineKeyboardButton(
text="• Cʟsᴇ", callback_data="cls"
),
]
]
)
chat_id = m.chat.id
if len(m.command) < 2:
op = await skip_current_song(chat_id)
if op == 0:
await m.reply("❌ nothing is currently playing")
elif op == 1:
await m.reply("✅ __Queues__ is empty.\n\n• userbot leaving voice chat")
else:
await m.reply_photo(
photo=f"{IMG_3}",
caption=f"⏭ **Skipped to the next track.**\n\n🏷 **Name:** [{op[0]}]({op[1]})\n💭 **Chat:** `{chat_id}`\n💡 **Status:** `Playing`\n🎧 **Request by:** {m.from_user.mention()}",
reply_markup=keyboard,
)
else:
skip = m.text.split(None, 1)[1]
OP = "🗑 **removed song from queue:**"
if chat_id in QUEUE:
items = [int(x) for x in skip.split(" ") if x.isdigit()]
items.sort(reverse=True)
for x in items:
if x == 0:
pass
else:
hm = await skip_item(chat_id, x)
if hm == 0:
pass
else:
OP = OP + "\n" + f"**#{x}** - {hm}"
await m.reply(OP)
@Client.on_message(
command(["stop", f"stop@{BOT_USERNAME}", "end", f"end@{BOT_USERNAME}", "vstop"])
& other_filters
)
@authorized_users_only
async def stop(client, m: Message):
chat_id = m.chat.id
if chat_id in QUEUE:
try:
await call_py.leave_group_call(chat_id)
clear_queue(chat_id)
await m.reply("✅ **streaming has ended.**")
except Exception as e:
await m.reply(f"🚫 **error:**\n\n`{e}`")
else:
await m.reply("❌ **nothing in streaming**")
@Client.on_message(
command(["pause", f"pause@{BOT_USERNAME}", "vpause"]) & other_filters
)
@authorized_users_only
async def pause(client, m: Message):
chat_id = m.chat.id
if chat_id in QUEUE:
try:
await call_py.pause_stream(chat_id)
await m.reply(
"⏸ **Track paused.**\n\n• **To resume the stream, use the**\n» /resume command."
)
except Exception as e:
await m.reply(f"🚫 **error:**\n\n`{e}`")
else:
await m.reply("❌ **nothing in streaming**")
@Client.on_message(
command(["resume", f"resume@{BOT_USERNAME}", "vresume"]) & other_filters
)
@authorized_users_only
async def resume(client, m: Message):
chat_id = m.chat.id
if chat_id in QUEUE:
try:
await call_py.resume_stream(chat_id)
await m.reply(
"▶️ **Track resumed.**\n\n• **To pause the stream, use the**\n» /pause command."
)
except Exception as e:
await m.reply(f"🚫 **error:**\n\n`{e}`")
else:
await m.reply("❌ **nothing in streaming**")
@Client.on_message(
command(["mute", f"mute@{BOT_USERNAME}", "vmute"]) & other_filters
)
@authorized_users_only
async def mute(client, m: Message):
chat_id = m.chat.id
if chat_id in QUEUE:
try:
await call_py.mute_stream(chat_id)
await m.reply(
"🔇 **Userbot muted.**\n\n• **To unmute the userbot, use the**\n» /unmute command."
)
except Exception as e:
await m.reply(f"🚫 **error:**\n\n`{e}`")
else:
await m.reply("❌ **nothing in streaming**")
@Client.on_message(
command(["unmute", f"unmute@{BOT_USERNAME}", "vunmute"]) & other_filters
)
@authorized_users_only
async def unmute(client, m: Message):
chat_id = m.chat.id
if chat_id in QUEUE:
try:
await call_py.unmute_stream(chat_id)
await m.reply(
"🔊 **Userbot unmuted.**\n\n• **To mute the userbot, use the**\n» /mute command."
)
except Exception as e:
await m.reply(f"🚫 **error:**\n\n`{e}`")
else:
await m.reply("❌ **nothing in streaming**")
@Client.on_callback_query(filters.regex("cbpause"))
@admin_only
async def cbpause(_, query: CallbackQuery):
chat_id = query.message.chat.id
if chat_id in QUEUE:
try:
await call_py.pause_stream(chat_id)
await query.edit_message_text(
"⏸ streaming is paused", reply_markup=bttn
)
except Exception as e:
await query.edit_message_text(f"🚫 **error:**\n\n`{e}`", reply_markup=bcl)
else:
await query.edit_message_text("❌ **nothing in streaming**", reply_markup=bcl)
@Client.on_callback_query(filters.regex("cbresume"))
@admin_only
async def cbresume(_, query: CallbackQuery):
chat_id = query.message.chat.id
if chat_id in QUEUE:
try:
await call_py.resume_stream(chat_id)
await query.edit_message_text(
"▶️ streaming is resumed", reply_markup=bttn
)
except Exception as e:
await query.edit_message_text(f"🚫 **error:**\n\n`{e}`", reply_markup=bcl)
else:
await query.edit_message_text("❌ **nothing in streaming**", reply_markup=bcl)
@Client.on_callback_query(filters.regex("cbstop"))
@admin_only
async def cbstop(_, query: CallbackQuery):
chat_id = query.message.chat.id
if chat_id in QUEUE:
try:
await call_py.leave_group_call(chat_id)
clear_queue(chat_id)
await query.edit_message_text("✅ **streaming has ended**", reply_markup=bcl)
except Exception as e:
await query.edit_message_text(f"🚫 **error:**\n\n`{e}`", reply_markup=bcl)
else:
await query.edit_message_text("❌ **nothing in streaming**", reply_markup=bcl)
@Client.on_callback_query(filters.regex("cbmute"))
@admin_only
async def cbmute(_, query: CallbackQuery):
chat_id = query.message.chat.id
if chat_id in QUEUE:
try:
await call_py.mute_stream(chat_id)
await query.edit_message_text(
"🔇 userbot succesfully muted", reply_markup=bttn
)
except Exception as e:
await query.edit_message_text(f"🚫 **error:**\n\n`{e}`", reply_markup=bcl)
else:
await query.edit_message_text("❌ **nothing in streaming**", reply_markup=bcl)
@Client.on_callback_query(filters.regex("cbunmute"))
@admin_only
async def cbunmute(_, query: CallbackQuery):
chat_id = query.message.chat.id
if chat_id in QUEUE:
try:
await call_py.unmute_stream(chat_id)
await query.edit_message_text(
"🔊 userbot succesfully unmuted", reply_markup=bttn
)
except Exception as e:
await query.edit_message_text(f"🚫 **error:**\n\n`{e}`", reply_markup=bcl)
else:
await query.edit_message_text("❌ **nothing in streaming**", reply_markup=bcl)
@Client.on_message(
command(["volume", f"volume@{BOT_USERNAME}", "vol"]) & other_filters
)
@authorized_users_only
async def change_volume(client, m: Message):
range = m.command[1]
chat_id = m.chat.id
if chat_id in QUEUE:
try:
await call_py.change_volume_call(chat_id, volume=int(range))
await m.reply(
f"✅ **volume set to** `{range}`%"
)
except Exception as e:
await m.reply(f"🚫 **error:**\n\n`{e}`")
else:
await m.reply("❌ **nothing in streaming**")