video-stream/program/punishment.py

229 lines
8.8 KiB
Python
Raw Permalink Normal View History

2022-02-22 23:35:10 +00:00
"""
Video + Music Stream Telegram Bot
Copyright (c) 2022-present levina=lab <https://github.com/levina-lab>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but without any warranty; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/licenses.html>
"""
2022-01-31 12:41:47 +00:00
2022-02-11 16:11:56 +00:00
2022-01-31 12:41:47 +00:00
import asyncio
2022-02-16 13:19:44 +00:00
from pyrogram import Client
2022-01-31 12:41:47 +00:00
from pyrogram.types import Message
from pyrogram.errors import FloodWait
2022-02-22 23:35:10 +00:00
2022-02-15 04:44:58 +00:00
from driver.core import me_bot
2022-02-08 02:42:04 +00:00
from driver.filters import command, other_filters
2022-02-08 22:05:59 +00:00
from driver.decorators import bot_creator
2022-01-31 12:41:47 +00:00
from driver.database.dbchat import get_served_chats
from driver.database.dbpunish import add_gban_user, is_gbanned_user, remove_gban_user
2022-02-20 13:44:48 +00:00
from config import OWNER_ID, SUDO_USERS, BOT_USERNAME as bn
2022-01-31 12:41:47 +00:00
2022-02-08 02:42:04 +00:00
@Client.on_message(command(["gban", f"gban@{bn}"]) & other_filters)
2022-02-08 22:05:26 +00:00
@bot_creator
2022-01-31 12:41:47 +00:00
async def global_banned(c: Client, message: Message):
2022-02-15 04:44:58 +00:00
BOT_NAME = me_bot.first_name
2022-01-31 12:41:47 +00:00
if not message.reply_to_message:
if len(message.command) < 2:
await message.reply_text("**usage:**\n\n/gban [username | user_id]")
return
user = message.text.split(None, 2)[1]
if "@" in user:
user = user.replace("@", "")
user = await c.get_users(user)
from_user = message.from_user
2022-02-15 04:44:58 +00:00
BOT_ID = me_bot.id
2022-01-31 12:41:47 +00:00
if user.id == from_user.id:
2022-02-20 13:44:48 +00:00
await message.reply_text("You can't gban yourself !")
2022-01-31 12:41:47 +00:00
elif user.id == BOT_ID:
await message.reply_text("I can't gban myself !")
elif user.id in SUDO_USERS:
await message.reply_text("You can't gban sudo user !")
2022-02-20 13:44:48 +00:00
elif user.id in OWNER_ID:
await message.reply_text("You can't gban my creator !")
2022-01-31 12:41:47 +00:00
else:
await add_gban_user(user.id)
served_chats = []
chats = await get_served_chats()
for chat in chats:
served_chats.append(int(chat["chat_id"]))
m = await message.reply_text(
f"🚷 **Globally banning {user.mention}**\n⏱ Expected time: `{len(served_chats)}`"
)
number_of_chats = 0
for num in served_chats:
try:
await c.ban_chat_member(num, user.id)
number_of_chats += 1
await asyncio.sleep(1)
except FloodWait as e:
await asyncio.sleep(int(e.x))
except Exception:
pass
ban_text = f"""
🚷 **New Global ban on [{BOT_NAME}](https://t.me/{bn})
**Origin:** {message.chat.title} [`{message.chat.id}`]
**Sudo User:** {from_user.mention}
**Banned User:** {user.mention}
**Banned User ID:** `{user.id}`
**Chats:** `{number_of_chats}`"""
try:
await m.delete()
except Exception:
pass
await message.reply_text(
f"{ban_text}",
disable_web_page_preview=True,
)
return
from_user_id = message.from_user.id
from_user_mention = message.from_user.mention
user_id = message.reply_to_message.from_user.id
mention = message.reply_to_message.from_user.mention
2022-02-15 04:44:58 +00:00
BOT_ID = me_bot.id
2022-01-31 12:41:47 +00:00
if user_id == from_user_id:
await message.reply_text("You can't gban yourself !")
elif user_id == BOT_ID:
await message.reply_text("I can't gban myself !")
elif user_id in SUDO_USERS:
await message.reply_text("You can't gban sudo user !")
2022-02-20 13:44:48 +00:00
elif user_id in OWNER_ID:
await message.reply_text("You can't gban my creator !")
2022-01-31 12:41:47 +00:00
else:
is_gbanned = await is_gbanned_user(user_id)
if is_gbanned:
await message.reply_text("This user already gbanned !")
else:
await add_gban_user(user_id)
served_chats = []
chats = await get_served_chats()
for chat in chats:
served_chats.append(int(chat["chat_id"]))
m = await message.reply_text(
f"🚷 **Globally banning {mention}**\n⏱ Expected time: `{len(served_chats)}`"
)
number_of_chats = 0
for num in served_chats:
try:
await c.ban_chat_member(num, user_id)
number_of_chats += 1
await asyncio.sleep(1)
except FloodWait as e:
await asyncio.sleep(int(e.x))
except Exception:
pass
ban_text = f"""
🚷 **New Global ban on [{BOT_NAME}](https://t.me/{bn})
**Origin:** {message.chat.title} [`{message.chat.id}`]
**Sudo User:** {from_user_mention}
**Banned User:** {mention}
**Banned User ID:** `{user_id}`
**Chats:** `{number_of_chats}`"""
try:
await m.delete()
except Exception:
pass
await message.reply_text(
f"{ban_text}",
disable_web_page_preview=True,
)
return
2022-02-08 02:42:04 +00:00
@Client.on_message(command(["ungban", f"ungban@{bn}"]) & other_filters)
2022-02-08 22:05:26 +00:00
@bot_creator
2022-01-31 12:41:47 +00:00
async def ungban_global(c: Client, message: Message):
chat_id = message.chat.id
2022-01-31 12:41:47 +00:00
if not message.reply_to_message:
if len(message.command) != 2:
await message.reply_text(
"**usage:**\n\n/ungban [username | user_id]"
)
return
user = message.text.split(None, 1)[1]
if "@" in user:
user = user.replace("@", "")
user = await c.get_users(user)
from_user = message.from_user
2022-02-15 04:44:58 +00:00
BOT_ID = me_bot.id
2022-01-31 12:41:47 +00:00
if user.id == from_user.id:
await message.reply_text("You can't ungban yourself because you can't be gbanned !")
elif user.id == BOT_ID:
await message.reply_text("I can't ungban myself because i can't be gbanned !")
elif user.id in SUDO_USERS:
await message.reply_text("Sudo users can't be gbanned/ungbanned !")
2022-02-20 13:44:48 +00:00
elif user.id in OWNER_ID:
await message.reply_text("Bot creator can't be gbanned/ungbanned !")
2022-01-31 12:41:47 +00:00
else:
is_gbanned = await is_gbanned_user(user.id)
if not is_gbanned:
2022-02-20 13:44:48 +00:00
await message.reply_text("This user is not gbanned !")
2022-01-31 12:41:47 +00:00
else:
2022-02-24 04:35:15 +00:00
msg = await message.reply_text("» ungbanning user...")
2022-01-31 12:41:47 +00:00
await remove_gban_user(user.id)
2022-02-24 00:57:22 +00:00
served_chats = []
chats = await get_served_chats()
for chat in chats:
served_chats.append(int(chat["chat_id"]))
number_of_chats = 0
for num in served_chats:
try:
await c.unban_chat_member(num, user.id)
number_of_chats += 1
await asyncio.sleep(1)
except FloodWait as e:
await asyncio.sleep(int(e.x))
except BaseException:
pass
2022-02-24 04:35:15 +00:00
await msg.edit_text("✅ This user has ungbanned")
2022-01-31 12:41:47 +00:00
return
from_user_id = message.from_user.id
user_id = message.reply_to_message.from_user.id
mention = message.reply_to_message.from_user.mention
2022-02-15 04:44:58 +00:00
BOT_ID = me_bot.id
2022-01-31 12:41:47 +00:00
if user_id == from_user_id:
await message.reply_text("You can't ungban yourself because you can't be gbanned !")
elif user_id == BOT_ID:
2022-02-20 13:44:48 +00:00
await message.reply_text("I can't ungban myself because i can't be gbanned !")
2022-01-31 12:41:47 +00:00
elif user_id in SUDO_USERS:
await message.reply_text("Sudo users can't be gbanned/ungbanned !")
2022-02-20 13:44:48 +00:00
elif user_id in OWNER_ID:
await message.reply_text("Bot creator can't be gbanned/ungbanned !")
2022-01-31 12:41:47 +00:00
else:
is_gbanned = await is_gbanned_user(user_id)
if not is_gbanned:
2022-02-20 13:44:48 +00:00
await message.reply_text("This user is not gbanned !")
2022-01-31 12:41:47 +00:00
else:
2022-02-24 04:35:15 +00:00
msg = await message.reply_text("» ungbanning user...")
2022-01-31 12:41:47 +00:00
await remove_gban_user(user_id)
2022-02-24 00:57:22 +00:00
served_chats = []
chats = await get_served_chats()
for chat in chats:
served_chats.append(int(chat["chat_id"]))
number_of_chats = 0
for num in served_chats:
try:
await c.unban_chat_member(num, user_id)
number_of_chats += 1
await asyncio.sleep(1)
except FloodWait as e:
await asyncio.sleep(int(e.x))
except BaseException:
pass
2022-02-24 04:35:15 +00:00
await msg.edit_text("✅ This user has ungbanned")