This commit is contained in:
levina 2021-10-11 10:15:00 +07:00 committed by GitHub
parent ba6b2b6d68
commit 396abcf558
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 0 additions and 897 deletions

View File

@ -1,3 +0,0 @@
"""plugins"""
__version__ = "0.4.0"

View File

@ -1,76 +0,0 @@
# Copyright (C) 2021 By VeezMusicProject
# ===========
# running bot
# ===========
import logging
import time
import sys
import asyncio
import glob
import importlib
from pathlib import Path
from pyrogram import Client, idle
from config import Veez
from bot.videoplayer import app
from bot.videoplayer import call_py
from helpers.loggings import LOG
bot = Client(
":memory:",
Veez.API_ID,
Veez.API_HASH,
bot_token=Veez.BOT_TOKEN,
plugins=dict(root="bot"),
)
StartTime = time.time()
loop = asyncio.get_event_loop()
_path = f"bot/*.py"
files = glob.glob(_path)
def load_plugins(plugin_name):
path = Path(f"bot/{plugin_name}.py")
name = "bot.{}".format(plugin_name)
spec = importlib.util.spec_from_file_location(name, path)
load = importlib.util.module_from_spec(spec)
spec.loader.exec_module(load)
sys.modules[f"bot." + plugin_name] = load
print("Imported => " + plugin_name)
async def start():
print('\n')
print('------------------ Initalizing VEEZ --------------------')
if bot:
await bot.start()
await app.start()
await call_py.start()
print('------------------------ DONE --------------------------')
print('------------------ Importing Modules -------------------')
for name in files:
with open(name) as a:
path_ = Path(a.name)
plugin_name = path_.stem
load_plugins(plugin_name.replace(".py", ""))
print('------------------- INITIATED VEEZ ---------------------')
print(' Logged in as User =>> {}'.format((await app.get_me()).first_name))
if bot:
print(' Logged in to Bots =>> {}'.format((await bot.get_me()).first_name))
print('--------------------------------------------------------')
await idle()
if __name__ == '__main__':
is_bot = bool(Veez.BOT_TOKEN)
loop.run_until_complete(start())
# bot.start()
# print("[STATUS]:✅ »» BOT CLIENT STARTED ««")
# app.start()
# print("[STATUS]:✅ »» USERBOT CLIENT STARTED ««")
# call_py.start()
# print("[STATUS]:✅ »» PYTGCALLS CLIENT STARTED ««")
# idle()
# print("[STATUS]:❌ »» BOT STOPPED ««")

View File

@ -1,137 +0,0 @@
# Copyright (C) 2021 By VeezMusicProject
from pyrogram import Client, filters
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
from config import Veez
@Client.on_callback_query(filters.regex("cbguide"))
async def cbguide(_, query: CallbackQuery):
await query.edit_message_text(
f"""❓ HOW TO USE THIS BOT:
1.) first, add me to your group.
2.) then promote me as admin and give all permissions except anonymous admin.
3.) add @{Veez.ASSISTANT_NAME } to your group.
4.) turn on the voice chat first before start to stream video.
5.) type /vplay (reply to video) to start streaming.
6.) type /vstop to end the video streaming.
📝 **note: stream & stop command can only be executed by group admin only!**
__Maintained by Veez Project Team__""",
reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton(
"🏡 Go Back", callback_data="cbstart")
]]
))
@Client.on_callback_query(filters.regex("cbstart"))
async def cbstart(_, query: CallbackQuery):
await query.edit_message_text(
f"✨ **Hello there, I am a telegram group video streaming bot.**\n\n💭 **I was created to stream videos in group "
f"video chats easily.**\n\n❔ **To find out how to use me, please press the help button below** 👇🏻",
reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton(
" Add me to your Group ", url=f"https://t.me/{Veez.BOT_USERNAME}?startgroup=true")
], [
InlineKeyboardButton(
"❔ HOW TO USE THIS BOT", callback_data="cbguide")
], [
InlineKeyboardButton(
"🌐 Terms & Condition", callback_data="cbinfo")
], [
InlineKeyboardButton(
"💬 Group", url=f"https://t.me/{Veez.GROUP_NAME}"),
InlineKeyboardButton(
"📣 Channel", url=f"https://t.me/{Veez.CHANNEL_NAME}")
], [
InlineKeyboardButton(
"🧙🏻‍♂️ Owner", url=f"https://t.me/{Veez.OWNER_NAME}")
], [
InlineKeyboardButton(
"📚 All Command List", callback_data="cblist")
]]
))
@Client.on_callback_query(filters.regex("cbinfo"))
async def cbinfo(_, query: CallbackQuery):
await query.edit_message_text(
f"""🌐 **bot information !**
🤖 __This bot was created to stream video in telegram group video chats using several methods from WebRTC.__
💡 __Powered by PyTgcalls the Async client API for the Telegram Group Calls, and Pyrogram the telegram MTProto API
Client Library and Framework in Pure Python for Users and Bots.__
👨🏻💻 __Thanks to the developers who participated in the development of this bot, the list of devs can be seen below:__
👩🏻 » [Levina Shavila](https://github.com/levina-lab)
🤵🏻 » [Sammy-XD](https://github.com/Sammy-XD)
🤵🏻 » [Zxce3](https://github.com/Zxce3)
🤵🏻 » [Tofik Denianto](https://github.com/tofikdn)
🤵🏻 » [Shohih Abdul](https://github.com/DoellBarr)
__This bot licensed under GNU-GPL 3.0 License__""",
reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton(
"🏡 Go Back", callback_data="cbstart")
]]
),
disable_web_page_preview=True
)
@Client.on_callback_query(filters.regex("cblist"))
async def cblist(_, query: CallbackQuery):
await query.edit_message_text(
f"""📚 All Command List:
» /vplay (reply to video or yt/live url) - to stream video
» /vstop - stop the video streaming
» /song (song name) - download song from YT
» /vsong (video name) - download video from YT
» /lyric (song name) - lyric scrapper
» /vjoin - invite assistant join to your group
» /vleave - order assistant leave from your group
🎊 FUN CMD:
» /asupan - check it by yourself
» /chika - check it by yourself
» /wibu - check it by yourself
» /truth - check it by yourself
» /dare - check it by yourself
🔰 EXTRA CMD:
» /tts (reply to text) - text to speech
» /alive - check bot alive status
» /ping - check bot ping status
» /uptime - check bot uptime status
» /sysinfo - check bot system information
💡 SUDO ONLY:
» /rmd - remove all downloaded files
» /rmw - remove all downloaded raw files
» /leaveall - order assistant leave from all group
__Maintained by Veez Project Team__""",
reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton(
"🏡 Go Back", callback_data="cbstart")
]]
))
@Client.on_callback_query(filters.regex("cls"))
async def close(_, query: CallbackQuery):
await query.message.delete()

View File

@ -1,69 +0,0 @@
# Copyright (C) 2021 Veez Project
from pyrogram import Client, errors
from pyrogram.types import (
InlineQuery,
InlineQueryResultArticle,
InputTextMessageContent,
)
from youtubesearchpython import VideosSearch
@Client.on_inline_query()
async def inline(client: Client, query: InlineQuery):
answers = []
search_query = query.query.lower().strip().rstrip()
if search_query == "menu":
await client.answer_inline_query(
query.id,
results=menus,
switch_pm_text="Menu",
switch_pm_parameter="help",
cache_time=0,
)
if search_query == "":
await client.answer_inline_query(
query.id,
results=answers,
switch_pm_text="search a youtube video",
switch_pm_parameter="help",
cache_time=0,
)
else:
search = VideosSearch(search_query, limit=50)
for result in search.result()["result"]:
answers.append(
InlineQueryResultArticle(
title=result["title"],
description="{}, {}.".format(
result["duration"], result["viewCount"]["short"]
),
input_message_content=InputTextMessageContent(
"/vplay https://www.youtube.com/watch?v={}".format(result["id"])
),
thumb_url=result["thumbnails"][0]["url"],
)
)
try:
await query.answer(results=answers, cache_time=0)
except errors.QueryIdInvalid:
await query.answer(
results=answers,
cache_time=0,
switch_pm_text="Error: search timed out",
switch_pm_parameter="",
)
# ==================
# Tested
menus = [
InlineQueryResultArticle(title="Start", description="Start a bot",
input_message_content=InputTextMessageContent("/start")),
InlineQueryResultArticle(title="Info Bot", description="Info about this bot",
input_message_content=InputTextMessageContent("/info")),
]

View File

@ -1,47 +0,0 @@
# Copyright (C) 2021 By VeezMusicProject
import os
from pyrogram import Client, filters
from pyrogram.types import Message
from helpers.filters import command
from helpers.decorators import sudo_users_only, errors
downloads = os.path.realpath("bot/downloads")
raw = os.path.realpath(".")
@Client.on_message(command(["rmd", "cleardl"]) & ~filters.edited)
@errors
@sudo_users_only
async def clear_downloads(_, message: Message):
ls_dir = os.listdir(downloads)
if ls_dir:
for file in os.listdir(downloads):
os.remove(os.path.join(downloads, file))
await message.reply_text("✅ **deleted all downloaded files**")
else:
await message.reply_text("❌ **no files downloaded**")
@Client.on_message(command(["clean", "wipe", "rmw"]) & ~filters.edited)
@errors
@sudo_users_only
async def clear_raw(_, message: Message):
ls_dir = os.listdir(raw)
if ls_dir:
for file in os.listdir(raw):
if file.endswith('.raw'):
os.remove(os.path.join(raw, file))
await message.reply_text("✅ **deleted all raw files**")
else:
await message.reply_text("❌ **no raw files**")
@Client.on_message(command(["dahlah"]) & ~filters.edited)
# edit if u want
async def haduhh(_, message: Message):
pth = os.path.realpath(".")
ls_dir = os.listdir(pth)
if ls_dir:
for dta in os.listdir(pth):
os.system("rm -rf *.raw *.jpg")
await message.reply_text("Oke")
else:
await message.reply_text("tadi udah")

View File

@ -1,263 +0,0 @@
from __future__ import unicode_literals
import asyncio
import math
import os
import time
from random import randint
from urllib.parse import urlparse
import aiofiles
import aiohttp
import wget
import yt_dlp
from pyrogram import Client, filters
from pyrogram.errors import FloodWait, MessageNotModified
from pyrogram.types import Message
from youtube_search import YoutubeSearch
from yt_dlp import YoutubeDL
from config import BOT_USERNAME as bn
from helpers.decorators import humanbytes
from helpers.filters import command
ydl_opts = {
'format':'best',
'keepvideo':True,
'prefer_ffmpeg':False,
'geo_bypass':True,
'outtmpl':'%(title)s.%(ext)s',
'quite':True
}
@Client.on_message(command(["song", f"song@{bn}"]) & ~filters.edited)
def song(_, message):
query = " ".join(message.command[1:])
m = message.reply("🔎 finding song...")
ydl_ops = {"format": "bestaudio[ext=m4a]"}
try:
results = YoutubeSearch(query, max_results=1).to_dict()
link = f"https://youtube.com{results[0]['url_suffix']}"
title = results[0]["title"][:40]
thumbnail = results[0]["thumbnails"][0]
thumb_name = f"thumb{title}.jpg"
thumb = requests.get(thumbnail, allow_redirects=True)
open(thumb_name, "wb").write(thumb.content)
duration = results[0]["duration"]
except Exception as e:
m.edit("❌ song not found.\n\nplease give a valid song name.")
print(str(e))
return
m.edit("📥 downloading...")
try:
with yt_dlp.YoutubeDL(ydl_ops) as ydl:
info_dict = ydl.extract_info(link, download=False)
audio_file = ydl.prepare_filename(info_dict)
ydl.process_info(info_dict)
rep = f"**🎧 Uploader @{bn}**"
secmul, dur, dur_arr = 1, 0, duration.split(":")
for i in range(len(dur_arr) - 1, -1, -1):
dur += int(float(dur_arr[i])) * secmul
secmul *= 60
message.reply_audio(
audio_file,
caption=rep,
thumb=thumb_name,
parse_mode="md",
title=title,
duration=dur,
)
m.delete()
except Exception as e:
m.edit("❌ error, wait for bot owner to fix")
print(e)
try:
os.remove(audio_file)
os.remove(thumb_name)
except Exception as e:
print(e)
def get_text(message: Message) -> [None, str]:
text_to_return = message.text
if message.text is None:
return None
if " " not in text_to_return:
return None
try:
return message.text.split(None, 1)[1]
except IndexError:
return None
async def progress(current, total, message, start, type_of_ps, file_name=None):
now = time.time()
diff = now - start
if round(diff % 10.00) == 0 or current == total:
percentage = current * 100 / total
speed = current / diff
elapsed_time = round(diff) * 1000
if elapsed_time == 0:
return
time_to_completion = round((total - current) / speed) * 1000
estimated_total_time = elapsed_time + time_to_completion
progress_str = "{0}{1} {2}%\n".format(
"".join("🔴" for _ in range(math.floor(percentage / 10))),
"".join("🔘" for _ in range(10 - math.floor(percentage / 10))),
round(percentage, 2),
)
tmp = progress_str + "{0} of {1}\nETA: {2}".format(
humanbytes(current), humanbytes(total), time_formatter(estimated_total_time)
)
if file_name:
try:
await message.edit(
"{}\n**File Name:** `{}`\n{}".format(type_of_ps, file_name, tmp)
)
except FloodWait as e:
await asyncio.sleep(e.x)
except MessageNotModified:
pass
else:
try:
await message.edit("{}\n{}".format(type_of_ps, tmp))
except FloodWait as e:
await asyncio.sleep(e.x)
except MessageNotModified:
pass
def get_user(message: Message, text: str) -> [int, str, None]:
asplit = None if text is None else text.split(" ", 1)
user_s = None
reason_ = None
if message.reply_to_message:
user_s = message.reply_to_message.from_user.id
reason_ = text or None
elif asplit is None:
return None, None
elif len(asplit[0]) > 0:
user_s = int(asplit[0]) if asplit[0].isdigit() else asplit[0]
if len(asplit) == 2:
reason_ = asplit[1]
return user_s, reason_
def get_readable_time(seconds: int) -> str:
count = 0
ping_time = ""
time_list = []
time_suffix_list = ["s", "m", "h", "days"]
while count < 4:
count += 1
remainder, result = divmod(seconds, 60) if count < 3 else divmod(seconds, 24)
if seconds == 0 and remainder == 0:
break
time_list.append(int(result))
seconds = int(remainder)
for x in range(len(time_list)):
time_list[x] = str(time_list[x]) + time_suffix_list[x]
if len(time_list) == 4:
ping_time += time_list.pop() + ", "
time_list.reverse()
ping_time += ":".join(time_list)
return ping_time
def time_formatter(milliseconds: int) -> str:
seconds, milliseconds = divmod(int(milliseconds), 1000)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
tmp = (
((str(days) + " day(s), ") if days else "")
+ ((str(hours) + " hour(s), ") if hours else "")
+ ((str(minutes) + " minute(s), ") if minutes else "")
+ ((str(seconds) + " second(s), ") if seconds else "")
+ ((str(milliseconds) + " millisecond(s), ") if milliseconds else "")
)
return tmp[:-2]
def get_file_extension_from_url(url):
url_path = urlparse(url).path
basename = os.path.basename(url_path)
return basename.split(".")[-1]
async def download_song(url):
song_name = f"{randint(6969, 6999)}.mp3"
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status == 200:
f = await aiofiles.open(song_name, mode="wb")
await f.write(await resp.read())
await f.close()
return song_name
is_downloading = False
def time_to_seconds(times):
stringt = str(times)
return sum(int(x) * 60 ** i for i, x in enumerate(reversed(stringt.split(":"))))
@Client.on_message(
command(["vsong", f"vsong@{bn}", "video", f"video@{bn}"]) & ~filters.edited
)
async def vsong(client, message):
ydl_opts = {
"format": "best",
"keepvideo": True,
"prefer_ffmpeg": False,
"geo_bypass": True,
"outtmpl": "%(title)s.%(ext)s",
"quite": True,
}
query = " ".join(message.command[1:])
try:
results = YoutubeSearch(query, max_results=1).to_dict()
link = f"https://youtube.com{results[0]['url_suffix']}"
title = results[0]["title"][:40]
thumbnail = results[0]["thumbnails"][0]
thumb_name = f"{title}.jpg"
thumb = requests.get(thumbnail, allow_redirects=True)
open(thumb_name, "wb").write(thumb.content)
results[0]["duration"]
results[0]["url_suffix"]
results[0]["views"]
message.from_user.mention
except Exception as e:
print(e)
try:
msg = await message.reply("📥 **downloading video...**")
with YoutubeDL(ydl_opts) as ytdl:
ytdl_data = ytdl.extract_info(link, download=True)
file_name = ytdl.prepare_filename(ytdl_data)
except Exception as e:
return await msg.edit(f"🚫 **error:** {e}")
preview = wget.download(thumbnail)
await msg.edit("📤 **uploading video...**")
await message.reply_video(
file_name,
duration=int(ytdl_data["duration"]),
thumb=preview,
caption=ytdl_data["title"],
)
try:
os.remove(file_name)
await msg.delete()
except Exception as e:
print(e)

View File

@ -1,125 +0,0 @@
# Copyright (C) 2021 By VeezMusicProject
from datetime import datetime
from time import time
from pyrogram import Client, filters
from pyrogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup
from config import Veez
from helpers.decorators import sudo_users_only
from helpers.filters import command
START_TIME = datetime.utcnow()
START_TIME_ISO = START_TIME.replace(microsecond=0).isoformat()
TIME_DURATION_UNITS = (
('week', 60 * 60 * 24 * 7),
('day', 60 * 60 * 24),
('hour', 60 * 60),
('min', 60),
('sec', 1)
)
async def _human_time_duration(seconds):
if seconds == 0:
return 'inf'
parts = []
for unit, div in TIME_DURATION_UNITS:
amount, seconds = divmod(int(seconds), div)
if amount > 0:
parts.append('{} {}{}'
.format(amount, unit, "" if amount == 1 else "s"))
return ', '.join(parts)
@Client.on_message(command(["start", f"start@{Veez.BOT_USERNAME}"]))
async def start(_, m: Message):
if m.chat.type == "private":
await m.reply_text(
f"✨ **Hello there, I am a telegram group video streaming bot.**\n\n💭 **I was created to stream videos in group "
f"video chats easily.**\n\n❔ **To find out how to use me, please press the help button below** 👇🏻",
reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton(
" Add me to your Group ", url=f"https://t.me/{Veez.BOT_USERNAME}?startgroup=true")
], [
InlineKeyboardButton(
"❔ HOW TO USE THIS BOT", callback_data="cbguide")
], [
InlineKeyboardButton(
"🌐 Terms & Condition", callback_data="cbinfo")
], [
InlineKeyboardButton(
"💬 Group", url="https://t.me/VeezSupportGroup"),
InlineKeyboardButton(
"📣 Channel", url="https://t.me/levinachannel")
], [
InlineKeyboardButton(
"👩🏻‍💻 Developer", url="https://t.me/dlwrml")
], [
InlineKeyboardButton(
"📚 All Command List", callback_data="cblist")
]]
))
else:
await m.reply_text("**✨ bot is online now ✨**",
reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton(
"❔ HOW TO USE THIS BOT", callback_data="cbguide")
], [
InlineKeyboardButton(
"🌐 Search Youtube", switch_inline_query='')
], [
InlineKeyboardButton(
"📚 Command List", callback_data="cblist")
]]
)
)
@Client.on_message(command(["alive", f"alive@{Veez.BOT_USERNAME}"]) & filters.group & ~filters.edited)
async def alive(_, m: Message):
current_time = datetime.utcnow()
uptime_sec = (current_time - START_TIME).total_seconds()
uptime = await _human_time_duration(int(uptime_sec))
await m.reply_text(
f"""✅ **bot is running**\n<b>💠 **uptime:**</b> `{uptime}`""",
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
"✨ Group", url=f"https://t.me/VeezSupportGroup"
),
InlineKeyboardButton(
"📣 Channel", url=f"https://t.me/levinachannel"
)
]
]
)
)
@Client.on_message(command(["ping", f"ping@{Veez.BOT_USERNAME}"]) & ~filters.edited)
async def ping_pong(_, m: Message):
sturt = time()
m_reply = await m.reply_text("pinging...")
delta_ping = time() - sturt
await m_reply.edit_text(
"🏓 `PONG!!`\n"
f"⚡️ `{delta_ping * 1000:.3f} ms`"
)
@Client.on_message(command(["uptime", f"uptime@{Veez.BOT_USERNAME}"]) & ~filters.edited)
@sudo_users_only
async def get_uptime(_, m: Message):
current_time = datetime.utcnow()
uptime_sec = (current_time - START_TIME).total_seconds()
uptime = await _human_time_duration(int(uptime_sec))
await m.reply_text(
"🤖 bot status 🤖\n\n"
f"• **uptime:** `{uptime}`\n"
f"• **start time:** `{START_TIME_ISO}`"
)

View File

@ -1,53 +0,0 @@
# Copyright (C) 2021 Veez Project
import platform
import re
import socket
import uuid
import psutil
from pyrogram import Client, filters
from config import Veez
from helpers.decorators import sudo_users_only, humanbytes
from helpers.filters import command
# FETCH SYSINFO
@Client.on_message(command(["sysinfo", f"sysinfo@{Veez.BOT_USERNAME}"]) & ~filters.edited)
@sudo_users_only
async def give_sysinfo(client, message):
splatform = platform.system()
platform_release = platform.release()
platform_version = platform.version()
architecture = platform.machine()
hostname = socket.gethostname()
ip_address = socket.gethostbyname(socket.gethostname())
mac_address = ":".join(re.findall("..", "%012x" % uuid.getnode()))
processor = platform.processor()
ram = humanbytes(round(psutil.virtual_memory().total))
cpu_freq = psutil.cpu_freq().current
if cpu_freq >= 1000:
cpu_freq = f"{round(cpu_freq / 1000, 2)}GHz"
else:
cpu_freq = f"{round(cpu_freq, 2)}MHz"
du = psutil.disk_usage(client.workdir)
psutil.disk_io_counters()
disk = f"{humanbytes(du.used)} / {humanbytes(du.total)} " f"({du.percent}%)"
cpu_len = len(psutil.Process().cpu_affinity())
somsg = f"""**🖥 SYSTEM INFO**
**PlatForm :** `{splatform}`
**PlatForm - Release :** `{platform_release}`
**PlatFork - Version :** `{platform_version}`
**Architecture :** `{architecture}`
**Hostname :** `{hostname}`
**IP :** `{ip_address}`
**Mac :** `{mac_address}`
**Processor :** `{processor}`
**Ram : ** `{ram}`
**CPU :** `{cpu_len}`
**CPU FREQ :** `{cpu_freq}`
**DISK :** `{disk}`
"""
await message.reply(somsg)

View File

@ -1,88 +0,0 @@
import asyncio
from helpers.filters import command
from pyrogram import Client, filters
from config import BOT_USERNAME, SUDO_USERS
from pyrogram.errors import UserAlreadyParticipant
from helpers.decorators import authorized_users_only, errors
@Client.on_message(
command(["userbotjoin", f"userbotjoin@{BOT_USERNAME}"]) & ~filters.private & ~filters.bot
)
@authorized_users_only
@errors
async def addchannel(client, message):
chid = message.chat.id
try:
invitelink = await client.export_chat_invite_link(chid)
except:
await message.reply_text(
"<b>• **i'm not have permission:**\n\n» ❌ __Add Users__</b>",
)
return
try:
user = await USER.get_me()
except:
user.first_name = "music assistant"
try:
await USER.join_chat(invitelink)
await USER.send_message(
message.chat.id, "🤖: i'm joined here for playing music on voice chat"
)
except UserAlreadyParticipant:
await message.reply_text(
f"<b>✅ userbot already joined chat</b>",
)
except Exception as e:
print(e)
await message.reply_text(
f"<b>🛑 Flood Wait Error 🛑 \n\n User {user.first_name} couldn't join your group due to heavy join requests for userbot."
"\n\nor manually add assistant to your Group and try again</b>",
)
return
await message.reply_text(
f"<b>✅ userbot successfully joined chat</b>",
)
@Client.on_message(
command(["userbotleave", f"userbotleave@{BOT_USERNAME}"]) & filters.group & ~filters.edited
)
@authorized_users_only
async def rem(client, message):
try:
await USER.send_message(message.chat.id, "✅ userbot successfully left chat")
await USER.leave_chat(message.chat.id)
except:
await message.reply_text(
"<b>user couldn't leave your group, may be floodwaits.\n\nor manually kick me from your group</b>"
)
return
@Client.on_message(command(["leaveall", f"leaveall@{BOT_USERNAME}"]))
async def bye(client, message):
if message.from_user.id not in SUDO_USERS:
return
left = 0
failed = 0
lol = await message.reply("🔄 **userbot** leaving all chats !")
async for dialog in USER.iter_dialogs():
try:
await USER.leave_chat(dialog.chat.id)
left += 1
await lol.edit(
f"Userbot leaving all group...\n\nLeft: {left} chats.\nFailed: {failed} chats."
)
except:
failed += 1
await lol.edit(
f"Userbot leaving...\n\nLeft: {left} chats.\nFailed: {failed} chats."
)
await asyncio.sleep(0.7)
await client.send_message(
message.chat.id, f"Left {left} chats.\nFailed {failed} chats."
)

View File

@ -1,36 +0,0 @@
import logging
from pyrogram import Client
from config import BOT_USERNAME
from pyrogram.types import Message
from helpers.filters import command
from youtube_search import YoutubeSearch
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
logging.getLogger("pyrogram").setLevel(logging.WARNING)
@Client.on_message(command(["search", f"search@{BOT_USERNAME}"]))
async def ytsearch(_, message: Message):
try:
if len(message.command) < 2:
await message.reply_text("/search **needs an argument !**")
return
query = message.text.split(None, 1)[1]
m = await message.reply_text("🔎 **Searching...**")
results = YoutubeSearch(query, max_results=5).to_dict()
i = 0
text = ""
while i < 5:
text += f"🏷 **Name:** __{results[i]['title']}__\n"
text += f"⏱ **Duration:** `{results[i]['duration']}`\n"
text += f"👀 **Views:** `{results[i]['views']}`\n"
text += f"📣 **Channel:** {results[i]['channel']}\n"
text += f"🔗: https://www.youtube.com{results[i]['url_suffix']}\n\n"
i += 1
await m.edit(text, disable_web_page_preview=True)
except Exception as e:
await m.edit(str(e))