video-stream/program/start.py
levina 592fc9bef4
detect chat
if the chat is in blacklisted chat database bot must leave from that chat
2022-02-07 19:06:48 +07:00

233 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from datetime import datetime
from sys import version_info
from time import time
from config import (
ALIVE_IMG,
ALIVE_NAME,
BOT_NAME,
BOT_USERNAME,
GROUP_SUPPORT,
OWNER_NAME,
UPDATES_CHANNEL,
)
from program import __version__
from driver.veez import user, bot
from driver.filters import command, other_filters
from driver.database.dbchat import add_served_chat, is_served_chat
from driver.database.dbpunish import is_gbanned_user
from driver.database.dbusers import add_served_user
from driver.database.dblockchat import blacklisted_chats
from pyrogram import Client, filters, __version__ as pyrover
from pyrogram.errors import FloodWait, MessageNotModified
from pytgcalls import (__version__ as pytover)
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message, ChatJoinRequest
__major__ = 0
__minor__ = 2
__micro__ = 1
__python_version__ = f"{version_info[0]}.{version_info[1]}.{version_info[2]}"
START_TIME = datetime.utcnow()
START_TIME_ISO = START_TIME.replace(microsecond=0).isoformat()
TIME_DURATION_UNITS = (
("week", 60 * 60 * 24 * 7),
("day", 60 * 60 * 24),
("hour", 60 * 60),
("min", 60),
("sec", 1),
)
async def _human_time_duration(seconds):
if seconds == 0:
return "inf"
parts = []
for unit, div in TIME_DURATION_UNITS:
amount, seconds = divmod(int(seconds), div)
if amount > 0:
parts.append("{} {}{}".format(amount, unit, "" if amount == 1 else "s"))
return ", ".join(parts)
@Client.on_message(
command(["start", f"start@{BOT_USERNAME}"]) & filters.private & ~filters.edited
)
async def start_(c: Client, message: Message):
user_id = message.from_user.id
if await is_gbanned_user(user_id):
await message.reply_text("❗️ **You've blocked from using this bot!**")
return
await message.reply_text(
f"""✨ **Welcome {message.from_user.mention()} !**\n
💭 [{BOT_NAME}](https://t.me/{BOT_USERNAME}) **Allows you to play music and video on groups through the Telegram Group video chat!**
💡 **Find out all the Bot's commands and how they work by clicking on the » 📚 Commands button!**
🔖 **To know how to use this bot, please click on the » ❓ Basic Guide button!**
""",
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
" Add me to your Group ",
url=f"https://t.me/{BOT_USERNAME}?startgroup=true",
)
],
[InlineKeyboardButton("❓ Basic Guide", callback_data="cbhowtouse")],
[
InlineKeyboardButton("📚 Commands", callback_data="cbcmds"),
InlineKeyboardButton("❤️ Donate", url=f"https://t.me/{OWNER_NAME}"),
],
[
InlineKeyboardButton(
"👥 Official Group", url=f"https://t.me/{GROUP_SUPPORT}"
),
InlineKeyboardButton(
"📣 Official Channel", url=f"https://t.me/{UPDATES_CHANNEL}"
),
],
[
InlineKeyboardButton(
"🌐 Source Code", url="https://github.com/levina-lab/video-stream"
)
],
]
),
disable_web_page_preview=True,
)
@Client.on_message(
command(["alive", f"alive@{BOT_USERNAME}"]) & filters.group & ~filters.edited
)
async def alive(c: Client, message: Message):
user_id = message.from_user.id
if await is_gbanned_user(user_id):
await message.reply_text("❗️ **You've blocked from using this bot!**")
return
chat_id = message.chat.id
current_time = datetime.utcnow()
uptime_sec = (current_time - START_TIME).total_seconds()
uptime = await _human_time_duration(int(uptime_sec))
keyboard = InlineKeyboardMarkup(
[
[
InlineKeyboardButton("✨ Group", url=f"https://t.me/{GROUP_SUPPORT}"),
InlineKeyboardButton(
"📣 Channel", url=f"https://t.me/{UPDATES_CHANNEL}"
),
]
]
)
alive = f"**Hello {message.from_user.mention()}, i'm {BOT_NAME}**\n\n🧑🏼‍💻 My Master: [{ALIVE_NAME}](https://t.me/{OWNER_NAME})\n👾 Bot Version: `v{__version__}`\n🔥 Pyrogram Version: `{pyrover}`\n🐍 Python Version: `{__python_version__}`\n✨ PyTgCalls Version: `{pytover.__version__}`\n🆙 Uptime Status: `{uptime}`\n\n❤ **Thanks for Adding me here, for playing video & music on your Group's video chat**"
await c.send_photo(
chat_id,
photo=f"{ALIVE_IMG}",
caption=alive,
reply_markup=keyboard,
)
@Client.on_message(command(["ping", f"ping@{BOT_USERNAME}"]) & ~filters.edited)
async def ping_pong(c: Client, message: Message):
user_id = message.from_user.id
if await is_gbanned_user(user_id):
await message.reply_text("❗️ **You've blocked from using this bot!**")
return
start = time()
m_reply = await message.reply_text("pinging...")
delta_ping = time() - start
await m_reply.edit_text("🏓 `PONG!!`\n" f"⚡️ `{delta_ping * 1000:.3f} ms`")
@Client.on_message(command(["uptime", f"uptime@{BOT_USERNAME}"]) & ~filters.edited)
async def get_uptime(c: Client, message: Message):
user_id = message.from_user.id
if await is_gbanned_user(user_id):
await message.reply_text("❗️ **You've blocked from using this bot!**")
return
current_time = datetime.utcnow()
uptime_sec = (current_time - START_TIME).total_seconds()
uptime = await _human_time_duration(int(uptime_sec))
await message.reply_text(
"🤖 bot status:\n"
f"• **uptime:** `{uptime}`\n"
f"• **start time:** `{START_TIME_ISO}`"
)
@Client.on_chat_join_request()
async def approve_join_chat(c: Client, m: ChatJoinRequest):
if not m.from_user:
return
try:
await c.approve_chat_join_request(m.chat.id, m.from_user.id)
except FloodWait as e:
await asyncio.sleep(e.x + 2)
await c.approve_chat_join_request(m.chat.id, m.from_user.id)
@Client.on_message(filters.new_chat_members)
async def new_chat(c: Client, m: Message):
chat_id = m.chat.id
if await is_served_chat(chat_id):
pass
else:
await add_served_chat(chat_id)
ass_uname = (await user.get_me()).username
bot_id = (await c.get_me()).id
for member in m.new_chat_members:
if chat_id in blacklisted_chats():
await m.reply(
"❗️ This chat has blacklisted by sudo user and You're not allowed to use me in this chat."
)
return await bot.leave_chat(chat_id)
if member.id == bot_id:
return await m.reply(
"❤️ Thanks for adding me to the **Group** !\n\n"
"Appoint me as administrator in the **Group**, otherwise I will not be able to work properly, and don't forget to type `/userbotjoin` for invite the assistant.\n\n"
"Once done, then type `/reload`",
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton("📣 Channel", url=f"https://t.me/{UPDATES_CHANNEL}"),
InlineKeyboardButton("💭 Support", url=f"https://t.me/{GROUP_SUPPORT}")
],
[
InlineKeyboardButton("👤 Assistant", url=f"https://t.me/{ass_uname}")
]
]
)
)
chat_watcher_group = 10
@Client.on_message(group=chat_watcher_group)
async def chat_watcher_func(_, message: Message):
if message.from_user:
user_id = message.from_user.id
await add_served_user(user_id)
return
try:
userid = message.from_user.id
except Exception:
return
suspect = f"[{message.from_user.first_name}](tg://user?id={message.from_user.id})"
if await is_gbanned_user(userid):
try:
await message.chat.ban_member(userid)
except Exception:
return
await message.reply_text(
f"👮🏼 (> {suspect} <)\n\n**Gbanned** user detected, that user has been gbanned by sudo user and was blocked from this Chat !\n\n🚫 **Reason:** potential spammer and abuser."
)