video-stream/program/admins.py
2022-03-02 21:01:45 +08:00

361 lines
13 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Video + Music Stream Telegram Bot
Copyright (c) 2022-present levina=lab <https://github.com/levina-lab>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but without any warranty; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/licenses.html>
"""
import traceback
from cache.admins import admins
from driver.core import calls
from pyrogram import Client, filters
from driver.design.thumbnail import thumb
from driver.design.chatname import CHAT_TITLE
from driver.queues import QUEUE, clear_queue
from driver.filters import command, other_filters
from driver.decorators import authorized_users_only, check_blacklist
from driver.utils import skip_current_song, skip_item, remove_if_exists
from config import BOT_USERNAME, IMG_5
from driver.database.dbqueue import (
is_music_playing,
remove_active_chat,
music_off,
music_on,
)
from program.utils.inline import stream_markup, close_mark
from pyrogram.types import (
CallbackQuery,
InlineKeyboardMarkup,
Message,
)
@Client.on_message(command(["reload", f"reload@{BOT_USERNAME}"]) & other_filters)
@authorized_users_only
@check_blacklist()
async def update_admin(client, message: Message):
global admins
new_admins = []
new_ads = await client.get_chat_members(message.chat.id, filter="administrators")
for u in new_ads:
new_admins.append(u.user.id)
admins[message.chat.id] = new_admins
await message.reply_text(
"✅ **重载完成!**\n"
"✅ **管理员列表已经刷新!**"
)
@Client.on_message(
command(["stop", f"stop@{BOT_USERNAME}", "end", f"end@{BOT_USERNAME}", "vstop"])
& other_filters
)
@authorized_users_only
@check_blacklist()
async def stop(client, m: Message):
chat_id = m.chat.id
if chat_id in QUEUE:
try:
await calls.leave_group_call(chat_id)
await remove_active_chat(chat_id)
clear_queue(chat_id)
await m.reply("✅ 成功退出语音聊天")
except Exception as e:
traceback.print_exc()
await m.reply(f"🚫 **错误:**\n\n`{e}`")
else:
await m.reply("❌ **队列为空**")
@Client.on_message(
command(["pause", f"pause@{BOT_USERNAME}", "vpause"]) & other_filters
)
@authorized_users_only
@check_blacklist()
async def pause(client, m: Message):
chat_id = m.chat.id
if chat_id in QUEUE:
try:
if not await is_music_playing(chat_id):
await m.reply(" 已经暂停!")
return
await calls.pause_stream(chat_id)
await music_off(chat_id)
await m.reply(
"⏸ **已经暂停**\n\n• **恢复播放请发送下列命令**\n» /resume"
)
except Exception as e:
traceback.print_exc()
await m.reply(f"🚫 **错误:**\n\n`{e}`")
else:
await m.reply("❌ **队列为空**")
@Client.on_message(
command(["resume", f"resume@{BOT_USERNAME}", "vresume"]) & other_filters
)
@authorized_users_only
@check_blacklist()
async def resume(client, m: Message):
chat_id = m.chat.id
if chat_id in QUEUE:
try:
if await is_music_playing(chat_id):
await m.reply(" 已经恢复!")
return
await calls.resume_stream(chat_id)
await music_on(chat_id)
await m.reply(
"▶️ **已经恢复**\n\n• **暂停播放请发送下列命令**\n» /pause"
)
except Exception as e:
traceback.print_exc()
await m.reply(f"🚫 **错误:**\n\n`{e}`")
else:
await m.reply("❌ **队列为空**")
@Client.on_message(command(["skip", f"skip@{BOT_USERNAME}", "vskip"]) & other_filters)
@authorized_users_only
@check_blacklist()
async def skip(c: Client, m: Message):
user_id = m.from_user.id
chat_id = m.chat.id
if len(m.command) < 2:
op = await skip_current_song(chat_id)
if op == 0:
await c.send_message(chat_id, "❌ 错误,队列为空!")
elif op == 1:
await c.send_message(chat_id, "» 全部歌曲已播放完毕,自动退出语音聊天")
elif op == 2:
await c.send_message(chat_id, "🗑️ **已经清空队列**\n\n• 自动退出语音聊天")
else:
buttons = stream_markup(user_id)
requester = f"[{m.from_user.first_name}](tg://user?id={m.from_user.id})"
thumbnail = f"{IMG_5}"
title = f"{op[0]}"
userid = m.from_user.id
gcname = m.chat.title
ctitle = await CHAT_TITLE(gcname)
image = await thumb(thumbnail, title, userid, ctitle)
await c.send_photo(
chat_id,
photo=image,
reply_markup=InlineKeyboardMarkup(buttons),
caption=f"⏭ **跳到下一首** \n\n"
f"🗂 **名称:** [{op[0]}]({op[1]})\n"
f"💭 **会话:** `{chat_id}`\n"
f"🧸 **添加者:** {requester}",
)
remove_if_exists(image)
else:
skip = m.text.split(None, 1)[1]
track = "🗑 已从队列中删除:"
if chat_id in QUEUE:
items = [int(x) for x in skip.split(" ") if x.isdigit()]
items.sort(reverse=True)
for x in items:
if x == 0:
pass
else:
data = await skip_item(chat_id, x)
if data == 0:
pass
else:
track = track + "\n" + f"**#{x}** - {data}"
await m.reply(track)
@Client.on_message(
command(["mute", f"mute@{BOT_USERNAME}", "vmute"]) & other_filters
)
@authorized_users_only
@check_blacklist()
async def mute(client, m: Message):
chat_id = m.chat.id
if chat_id in QUEUE:
try:
if not await is_music_playing(chat_id):
await m.reply(" 已经静音!")
return
await calls.mute_stream(chat_id)
await music_off(chat_id)
await m.reply(
"🔇 **已经静音**\n\n• **解除静音请发送下列命令**\n» /unmute"
)
except Exception as e:
traceback.print_exc()
await m.reply(f"🚫 **错误:**\n\n`{e}`")
else:
await m.reply("❌ **队列为空**")
@Client.on_message(
command(["unmute", f"unmute@{BOT_USERNAME}", "vunmute"]) & other_filters
)
@authorized_users_only
@check_blacklist()
async def unmute(client, m: Message):
chat_id = m.chat.id
if chat_id in QUEUE:
try:
if await is_music_playing(chat_id):
await m.reply(" 已经恢复!")
return
await calls.unmute_stream(chat_id)
await music_on(chat_id)
await m.reply(
"🔊 **已解除静音**\n\n• **静音请发送下列命令**\n» /mute"
)
except Exception as e:
traceback.print_exc()
await m.reply(f"🚫 **错误:**\n\n`{e}`")
else:
await m.reply("❌ **队列为空**")
@Client.on_callback_query(filters.regex("set_pause"))
@check_blacklist()
async def cbpause(_, query: CallbackQuery):
a = await _.get_chat_member(query.message.chat.id, query.from_user.id)
if not a.can_manage_voice_chats:
return await query.answer("💡 仅拥有 管理语音聊天 权限的管理员可以按按钮", show_alert=True)
chat_id = query.message.chat.id
if chat_id in QUEUE:
try:
if not await is_music_playing(chat_id):
await query.answer(" 播放暂停!", show_alert=True)
return
await calls.pause_stream(chat_id)
await music_off(chat_id)
await query.answer("⏸ 播放暂停!", show_alert=True)
except Exception as e:
traceback.print_exc()
await query.edit_message_text(f"🚫 **错误:**\n\n`{e}`", reply_markup=close_mark)
else:
await query.answer("❌ 队列为空", show_alert=True)
@Client.on_callback_query(filters.regex("set_resume"))
@check_blacklist()
async def cbresume(_, query: CallbackQuery):
a = await _.get_chat_member(query.message.chat.id, query.from_user.id)
if not a.can_manage_voice_chats:
return await query.answer("💡 仅拥有 管理语音聊天 权限的管理员可以按按钮", show_alert=True)
chat_id = query.message.chat.id
if chat_id in QUEUE:
try:
if await is_music_playing(chat_id):
await query.answer(" 恢复播放!", show_alert=True)
return
await calls.resume_stream(chat_id)
await music_on(chat_id)
await query.answer("▶️ 恢复播放!", show_alert=True)
except Exception as e:
traceback.print_exc()
await query.edit_message_text(f"🚫 **错误:**\n\n`{e}`", reply_markup=close_mark)
else:
await query.answer("❌ 队列为空", show_alert=True)
@Client.on_callback_query(filters.regex("set_stop"))
@check_blacklist()
async def cbstop(_, query: CallbackQuery):
a = await _.get_chat_member(query.message.chat.id, query.from_user.id)
if not a.can_manage_voice_chats:
return await query.answer("💡 仅拥有 管理语音聊天 权限的管理员可以按按钮", show_alert=True)
chat_id = query.message.chat.id
if chat_id in QUEUE:
try:
await calls.leave_group_call(chat_id)
await remove_active_chat(chat_id)
clear_queue(chat_id)
await query.edit_message_text("✅ **播放已经停止**", reply_markup=close_mark)
except Exception as e:
traceback.print_exc()
await query.edit_message_text(f"🚫 **错误:**\n\n`{e}`", reply_markup=close_mark)
else:
await query.answer("❌ 队列为空", show_alert=True)
@Client.on_callback_query(filters.regex("set_mute"))
@check_blacklist()
async def cbmute(_, query: CallbackQuery):
a = await _.get_chat_member(query.message.chat.id, query.from_user.id)
if not a.can_manage_voice_chats:
return await query.answer("💡 仅拥有 管理语音聊天 权限的管理员可以按按钮", show_alert=True)
chat_id = query.message.chat.id
if chat_id in QUEUE:
try:
if not await is_music_playing(chat_id):
await query.answer(" 静音成功!", show_alert=True)
return
await calls.mute_stream(chat_id)
await music_off(chat_id)
await query.answer("🔇 已被静音", show_alert=True)
except Exception as e:
traceback.print_exc()
await query.edit_message_text(f"🚫 **错误:**\n\n`{e}`", reply_markup=close_mark)
else:
await query.answer("❌ 队列为空", show_alert=True)
@Client.on_callback_query(filters.regex("set_unmute"))
@check_blacklist()
async def cbunmute(_, query: CallbackQuery):
a = await _.get_chat_member(query.message.chat.id, query.from_user.id)
if not a.can_manage_voice_chats:
return await query.answer("💡 仅拥有 管理语音聊天 权限的管理员可以按按钮", show_alert=True)
chat_id = query.message.chat.id
if chat_id in QUEUE:
try:
if await is_music_playing(chat_id):
await query.answer(" 解除静音成功!", show_alert=True)
return
await calls.unmute_stream(chat_id)
await music_on(chat_id)
await query.answer("🔊 解除静音成功", show_alert=True)
except Exception as e:
traceback.print_exc()
await query.edit_message_text(f"🚫 **错误:**\n\n`{e}`", reply_markup=close_mark)
else:
await query.answer("❌ 队列为空", show_alert=True)
@Client.on_message(
command(["volume", f"volume@{BOT_USERNAME}", "vol"]) & other_filters
)
@authorized_users_only
@check_blacklist()
async def change_volume(client, m: Message):
if len(m.command) < 2:
await m.reply_text("使用方法:`/volume` (`0-200`)")
return
range = m.command[1]
chat_id = m.chat.id
if chat_id in QUEUE:
try:
await calls.change_volume_call(chat_id, volume=int(range))
await m.reply(
f"✅ **音量调整到** `{range}`%"
)
except Exception as e:
traceback.print_exc()
await m.reply(f"🚫 **错误:**\n\n`{e}`")
else:
await m.reply("❌ **队列为空**")