import asyncio from datetime import datetime from sys import version_info from time import time from config import ( ALIVE_IMG, ALIVE_NAME, BOT_NAME, BOT_USERNAME, GROUP_SUPPORT, OWNER_NAME, UPDATES_CHANNEL, ) from program import __version__ from driver.veez import user from driver.filters import command, other_filters from driver.database.dbchat import add_served_chat, is_served_chat from driver.database.dbpunish import is_gbanned_user from driver.database.dbusers import add_served_user from pyrogram import Client, filters, __version__ as pyrover from pyrogram.errors import FloodWait, MessageNotModified from pytgcalls import (__version__ as pytover) from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message, ChatJoinRequest __major__ = 0 __minor__ = 2 __micro__ = 1 __python_version__ = f"{version_info[0]}.{version_info[1]}.{version_info[2]}" START_TIME = datetime.utcnow() START_TIME_ISO = START_TIME.replace(microsecond=0).isoformat() TIME_DURATION_UNITS = ( ("week", 60 * 60 * 24 * 7), ("day", 60 * 60 * 24), ("hour", 60 * 60), ("min", 60), ("sec", 1), ) async def _human_time_duration(seconds): if seconds == 0: return "inf" parts = [] for unit, div in TIME_DURATION_UNITS: amount, seconds = divmod(int(seconds), div) if amount > 0: parts.append("{} {}{}".format(amount, unit, "" if amount == 1 else "s")) return ", ".join(parts) @Client.on_message( command(["start", f"start@{BOT_USERNAME}"]) & filters.private & ~filters.edited ) async def start_(c: Client, message: Message): user_id = message.from_user.id if await is_gbanned_user(user_id): await message.reply_text(f"โ—๏ธ **You've been blocked from using this bot!") return await message.reply_text( f"""โœจ **Welcome {message.from_user.mention()} !**\n ๐Ÿ’ญ [{BOT_NAME}](https://t.me/{BOT_USERNAME}) **Allows you to play music and video on groups through the Telegram Group video chat!** ๐Ÿ’ก **Find out all the Bot's commands and how they work by clicking on the ยป ๐Ÿ“š Commands button!** ๐Ÿ”– **To know how to use this bot, please click on the ยป โ“ Basic Guide button!** """, reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton( "โž• Add me to your Group โž•", url=f"https://t.me/{BOT_USERNAME}?startgroup=true", ) ], [InlineKeyboardButton("โ“ Basic Guide", callback_data="cbhowtouse")], [ InlineKeyboardButton("๐Ÿ“š Commands", callback_data="cbcmds"), InlineKeyboardButton("โค๏ธ Donate", url=f"https://t.me/{OWNER_NAME}"), ], [ InlineKeyboardButton( "๐Ÿ‘ฅ Official Group", url=f"https://t.me/{GROUP_SUPPORT}" ), InlineKeyboardButton( "๐Ÿ“ฃ Official Channel", url=f"https://t.me/{UPDATES_CHANNEL}" ), ], [ InlineKeyboardButton( "๐ŸŒ Source Code", url="https://github.com/levina-lab/video-stream" ) ], ] ), disable_web_page_preview=True, ) @Client.on_message( command(["alive", f"alive@{BOT_USERNAME}"]) & filters.group & ~filters.edited ) async def alive(c: Client, message: Message): user_id = message.from_user.id if await is_gbanned_user(user_id): await message.reply_text(f"โ—๏ธ **You've been blocked from using this bot!") return chat_id = message.chat.id current_time = datetime.utcnow() uptime_sec = (current_time - START_TIME).total_seconds() uptime = await _human_time_duration(int(uptime_sec)) keyboard = InlineKeyboardMarkup( [ [ InlineKeyboardButton("โœจ Group", url=f"https://t.me/{GROUP_SUPPORT}"), InlineKeyboardButton( "๐Ÿ“ฃ Channel", url=f"https://t.me/{UPDATES_CHANNEL}" ), ] ] ) alive = f"**Hello {message.from_user.mention()}, i'm {BOT_NAME}**\n\n๐Ÿง‘๐Ÿผโ€๐Ÿ’ป My Master: [{ALIVE_NAME}](https://t.me/{OWNER_NAME})\n๐Ÿ‘พ Bot Version: `v{__version__}`\n๐Ÿ”ฅ Pyrogram Version: `{pyrover}`\n๐Ÿ Python Version: `{__python_version__}`\nโœจ PyTgCalls Version: `{pytover.__version__}`\n๐Ÿ†™ Uptime Status: `{uptime}`\n\nโค **Thanks for Adding me here, for playing video & music on your Group's video chat**" await c.send_photo( chat_id, photo=f"{ALIVE_IMG}", caption=alive, reply_markup=keyboard, ) @Client.on_message(command(["ping", f"ping@{BOT_USERNAME}"]) & ~filters.edited) async def ping_pong(c: Client, message: Message): user_id = message.from_user.id if await is_gbanned_user(user_id): await message.reply_text(f"โ—๏ธ **You've been blocked from using this bot!") return start = time() m_reply = await message.reply_text("pinging...") delta_ping = time() - start await m_reply.edit_text("๐Ÿ“ `PONG!!`\n" f"โšก๏ธ `{delta_ping * 1000:.3f} ms`") @Client.on_message(command(["uptime", f"uptime@{BOT_USERNAME}"]) & ~filters.edited) async def get_uptime(c: Client, message: Message): user_id = message.from_user.id if await is_gbanned_user(user_id): await message.reply_text(f"โ—๏ธ **You've been blocked from using this bot!") return current_time = datetime.utcnow() uptime_sec = (current_time - START_TIME).total_seconds() uptime = await _human_time_duration(int(uptime_sec)) await message.reply_text( "๐Ÿค– bot status:\n" f"โ€ข **uptime:** `{uptime}`\n" f"โ€ข **start time:** `{START_TIME_ISO}`" ) @Client.on_chat_join_request() async def approve_join_chat(c: Client, m: ChatJoinRequest): if not m.from_user: return try: await c.approve_chat_join_request(m.chat.id, m.from_user.id) except FloodWait as e: await asyncio.sleep(e.x + 2) await c.approve_chat_join_request(m.chat.id, m.from_user.id) @Client.on_message(filters.new_chat_members) async def new_chat(c: Client, m: Message): chat_id = m.chat.id if await is_served_chat(chat_id): pass else: await add_served_chat(chat_id) ass_uname = (await user.get_me()).username bot_id = (await c.get_me()).id for member in m.new_chat_members: if member.id == bot_id: return await m.reply( "โค๏ธ Thanks for adding me to the **Group** !\n\n" "Appoint me as administrator in the **Group**, otherwise I will not be able to work properly, and don't forget to type `/userbotjoin` for invite the assistant.\n\n" "Once done, then type `/reload`", reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton("๐Ÿ“ฃ Channel", url=f"https://t.me/{UPDATES_CHANNEL}"), InlineKeyboardButton("๐Ÿ’ญ Support", url=f"https://t.me/{GROUP_SUPPORT}") ], [ InlineKeyboardButton("๐Ÿ‘ค Assistant", url=f"https://t.me/{ass_uname}") ] ] ) ) chat_watcher_group = 10 @Client.on_message(group=chat_watcher_group) async def chat_watcher_func(_, message: Message): if message.from_user: user_id = message.from_user.id await add_served_user(user_id) return try: userid = message.from_user.id except Exception: return suspect = f"[{message.from_user.first_name}](tg://user?id={message.from_user.id})" if await is_gbanned_user(userid): try: await message.chat.ban_member(userid) except Exception: return await message.reply_text( f"๐Ÿ‘ฎ๐Ÿผ (> {suspect} <)\n\n**Gbanned** user detected, that user has been gbanned by sudo user and was blocked from this Chat !\n\n๐Ÿšซ **Reason:** potential spammer and abuser." )