""" Video + Music Stream Telegram Bot Copyright (c) 2022-present levina=lab This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but without any warranty; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see """ import asyncio from datetime import datetime from sys import version_info from time import time from config import ( ALIVE_IMG, ALIVE_NAME, BOT_USERNAME, GROUP_SUPPORT, OWNER_USERNAME, UPDATES_CHANNEL, ) from driver.decorators import check_blacklist from program import __version__ from driver.core import bot, me_bot, me_user from driver.filters import command from driver.database.dbchat import add_served_chat, is_served_chat from driver.database.dbpunish import is_gbanned_user from driver.database.dbusers import add_served_user, is_served_user from driver.database.dblockchat import blacklisted_chats from pyrogram import Client, filters, __version__ as pyrover from pyrogram.errors import FloodWait, ChatAdminRequired from pytgcalls import (__version__ as pytover) from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message, ChatJoinRequest __major__ = 0 __minor__ = 2 __micro__ = 1 __python_version__ = f"{version_info[0]}.{version_info[1]}.{version_info[2]}" START_TIME = datetime.utcnow() START_TIME_ISO = START_TIME.replace(microsecond=0).isoformat() TIME_DURATION_UNITS = ( ("week", 60 * 60 * 24 * 7), ("day", 60 * 60 * 24), ("hour", 60 * 60), ("min", 60), ("sec", 1), ) async def _human_time_duration(seconds): if seconds == 0: return "inf" parts = [] for unit, div in TIME_DURATION_UNITS: amount, seconds = divmod(int(seconds), div) if amount > 0: parts.append("{} {}{}".format(amount, unit, "" if amount == 1 else "s")) return ", ".join(parts) @Client.on_message( command(["start", f"start@{BOT_USERNAME}"]) & filters.private & ~filters.edited ) @check_blacklist() async def start_(c: Client, message: Message): user_id = message.from_user.id if await is_served_user(user_id): pass else: await add_served_user(user_id) return await message.reply_text( f"""โœจ **Welcome {message.from_user.mention()} !**\n ๐Ÿ’ญ [{me_bot.first_name}](https://t.me/{BOT_USERNAME}) **Is a bot to play music and video in groups, through the Telegram Group video chat!** ๐Ÿ’ก **Find out all the Bot's commands and how they work by clicking on the ยป ๐Ÿ“š Commands button!** ๐Ÿ”– **To know how to use this bot, please click on the ยป โ“ Basic Guide button!** """, reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton( "โž• Add me to your Group โž•", url=f"https://t.me/{BOT_USERNAME}?startgroup=true", ) ], [InlineKeyboardButton("โ“ Basic Guide", callback_data="user_guide")], [ InlineKeyboardButton("๐Ÿ“š Commands", callback_data="command_list"), InlineKeyboardButton("โค๏ธ Donate", url=f"https://t.me/{OWNER_USERNAME}"), ], [ InlineKeyboardButton( "๐Ÿ‘ฅ Official Group", url=f"https://t.me/{GROUP_SUPPORT}" ), InlineKeyboardButton( "๐Ÿ“ฃ Official Channel", url=f"https://t.me/{UPDATES_CHANNEL}" ), ], [ InlineKeyboardButton( "๐ŸŒ Source Code", url="https://github.com/levina-lab/video-stream" ) ], ] ), disable_web_page_preview=True, ) @Client.on_message( command(["alive", f"alive@{BOT_USERNAME}"]) & filters.group & ~filters.edited ) @check_blacklist() async def alive(c: Client, message: Message): chat_id = message.chat.id current_time = datetime.utcnow() uptime_sec = (current_time - START_TIME).total_seconds() uptime = await _human_time_duration(int(uptime_sec)) keyboard = InlineKeyboardMarkup( [ [ InlineKeyboardButton("โœจ Group", url=f"https://t.me/{GROUP_SUPPORT}"), InlineKeyboardButton( "๐Ÿ“ฃ Channel", url=f"https://t.me/{UPDATES_CHANNEL}" ), ] ] ) alive = f"**Hello {message.from_user.mention()}, I'm {me_bot.first_name}**\n\n๐Ÿง‘๐Ÿผโ€๐Ÿ’ป My Master: [{ALIVE_NAME}](https://t.me/{OWNER_USERNAME})\n๐Ÿ‘พ Bot Version: `v{__version__}`\n๐Ÿ”ฅ Pyrogram Version: `{pyrover}`\n๐Ÿ Python Version: `{__python_version__}`\nโœจ PyTgCalls Version: `{pytover.__version__}`\n๐Ÿ†™ Uptime Status: `{uptime}`\n\nโค **Thanks for Adding me here, for playing video & music on your Group's video chat**" await c.send_photo( chat_id, photo=f"{ALIVE_IMG}", caption=alive, reply_markup=keyboard, ) @Client.on_message(command(["ping", f"ping@{BOT_USERNAME}"]) & ~filters.edited) @check_blacklist() async def ping_pong(c: Client, message: Message): start = time() m_reply = await message.reply_text("pinging...") delta_ping = time() - start await m_reply.edit_text("๐Ÿ“ `PONG!!`\n" f"โšก๏ธ `{delta_ping * 1000:.3f} ms`") @Client.on_message(command(["uptime", f"uptime@{BOT_USERNAME}"]) & ~filters.edited) @check_blacklist() async def get_uptime(c: Client, message: Message): current_time = datetime.utcnow() uptime_sec = (current_time - START_TIME).total_seconds() uptime = await _human_time_duration(int(uptime_sec)) await message.reply_text( "๐Ÿค– bot status:\n" f"โ€ข **uptime:** `{uptime}`\n" f"โ€ข **start time:** `{START_TIME_ISO}`" ) @Client.on_chat_join_request() async def approve_join_chat(c: Client, m: ChatJoinRequest): if not m.from_user: return try: await c.approve_chat_join_request(m.chat.id, m.from_user.id) except FloodWait as e: await asyncio.sleep(e.x + 2) await c.approve_chat_join_request(m.chat.id, m.from_user.id) @Client.on_message(filters.new_chat_members) async def new_chat(c: Client, m: Message): chat_id = m.chat.id if await is_served_chat(chat_id): pass else: await add_served_chat(chat_id) for member in m.new_chat_members: if chat_id in await blacklisted_chats(): await m.reply( "โ—๏ธ This chat has blacklisted by sudo user and You're not allowed to use me in this chat." ) return await bot.leave_chat(chat_id) if member.id == me_bot.id: return await m.reply( "โค๏ธ Thanks for adding me to the **Group** !\n\n" "Appoint me as administrator in the **Group**, otherwise I will not be able to work properly, and don't forget to type `/userbotjoin` for invite the assistant.\n\n" "Once done, then type `/reload`", reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton("๐Ÿ“ฃ Channel", url=f"https://t.me/{UPDATES_CHANNEL}"), InlineKeyboardButton("๐Ÿ’ญ Support", url=f"https://t.me/{GROUP_SUPPORT}") ], [ InlineKeyboardButton("๐Ÿ‘ค Assistant", url=f"https://t.me/{me_user.username}") ] ] ) ) chat_watcher_group = 5 @Client.on_message(group=chat_watcher_group) async def chat_watcher_func(_, message: Message): userid = message.from_user.id suspect = f"[{message.from_user.first_name}](tg://user?id={message.from_user.id})" if await is_gbanned_user(userid): try: await message.chat.ban_member(userid) except ChatAdminRequired: LOGS.info(f"can't remove gbanned user from chat: {message.chat.id}") return await message.reply_text( f"๐Ÿ‘ฎ๐Ÿผ (> {suspect} <)\n\n**Gbanned** user detected, that user has been gbanned by sudo user and was blocked from this Chat !\n\n๐Ÿšซ **Reason:** potential spammer and abuser." )