292 lines
9.3 KiB
Python
292 lines
9.3 KiB
Python
from __future__ import unicode_literals
|
|
|
|
import asyncio
|
|
import math
|
|
import os
|
|
import time
|
|
from random import randint
|
|
from urllib.parse import urlparse
|
|
|
|
import aiofiles
|
|
import aiohttp
|
|
import requests
|
|
import youtube_dl
|
|
from pyrogram import Client, filters
|
|
from pyrogram.errors import FloodWait, MessageNotModified
|
|
from pyrogram.types import Message, InlineKeyboardButton, InlineKeyboardMarkup
|
|
from youtube_search import YoutubeSearch
|
|
|
|
from config import BOT_USERNAME
|
|
from helpers.filters import command
|
|
from helpers.decorators import humanbytes
|
|
|
|
|
|
@Client.on_message(command(["song", f"song@{BOT_USERNAME}"]) & ~filters.channel)
|
|
def song(_, message):
|
|
query = ""
|
|
for i in message.command[1:]:
|
|
query += " " + str(i)
|
|
print(query)
|
|
m = message.reply("🔎 finding song...")
|
|
ydl_ops = {"format": "bestaudio[ext=m4a]"}
|
|
try:
|
|
results = YoutubeSearch(query, max_results=1).to_dict()
|
|
link = f"https://youtube.com{results[0]['url_suffix']}"
|
|
title = results[0]["title"][:40]
|
|
thumbnail = results[0]["thumbnails"][0]
|
|
thumb_name = f"thumb{title}.jpg"
|
|
thumb = requests.get(thumbnail, allow_redirects=True)
|
|
open(thumb_name, "wb").write(thumb.content)
|
|
duration = results[0]["duration"]
|
|
|
|
except Exception as e:
|
|
m.edit("❌ song not found.\n\nplease give a valid song name.")
|
|
print(str(e))
|
|
return
|
|
m.edit("📥 downloading...")
|
|
try:
|
|
with youtube_dl.YoutubeDL(ydl_ops) as ydl:
|
|
info_dict = ydl.extract_info(link, download=False)
|
|
audio_file = ydl.prepare_filename(info_dict)
|
|
ydl.process_info(info_dict)
|
|
rep = f"**🎧 By @{BOT_USERNAME}**"
|
|
secmul, dur, dur_arr = 1, 0, duration.split(":")
|
|
for i in range(len(dur_arr) - 1, -1, -1):
|
|
dur += int(float(dur_arr[i])) * secmul
|
|
secmul *= 60
|
|
message.reply_audio(
|
|
audio_file,
|
|
caption=rep,
|
|
thumb=thumb_name,
|
|
parse_mode="md",
|
|
title=title,
|
|
duration=dur,
|
|
)
|
|
m.delete()
|
|
except Exception as e:
|
|
m.edit("❌ error, wait for dev to fix")
|
|
print(e)
|
|
|
|
try:
|
|
os.remove(audio_file)
|
|
os.remove(thumb_name)
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
|
|
def get_text(message: Message) -> [None, str]:
|
|
text_to_return = message.text
|
|
if message.text is None:
|
|
return None
|
|
if " " in text_to_return:
|
|
try:
|
|
return message.text.split(None, 1)[1]
|
|
except IndexError:
|
|
return None
|
|
else:
|
|
return None
|
|
|
|
|
|
async def progress(current, total, message, start, type_of_ps, file_name=None):
|
|
now = time.time()
|
|
diff = now - start
|
|
if round(diff % 10.00) == 0 or current == total:
|
|
percentage = current * 100 / total
|
|
speed = current / diff
|
|
elapsed_time = round(diff) * 1000
|
|
if elapsed_time == 0:
|
|
return
|
|
time_to_completion = round((total - current) / speed) * 1000
|
|
estimated_total_time = elapsed_time + time_to_completion
|
|
progress_str = "{0}{1} {2}%\n".format(
|
|
"".join(["🔴" for _ in range(math.floor(percentage / 10))]),
|
|
"".join(["🔘" for _ in range(10 - math.floor(percentage / 10))]),
|
|
round(percentage, 2),
|
|
)
|
|
tmp = progress_str + "{0} of {1}\nETA: {2}".format(
|
|
humanbytes(current), humanbytes(total), time_formatter(estimated_total_time)
|
|
)
|
|
if file_name:
|
|
try:
|
|
await message.edit(
|
|
"{}\n**File Name:** `{}`\n{}".format(type_of_ps, file_name, tmp)
|
|
)
|
|
except FloodWait as e:
|
|
await asyncio.sleep(e.x)
|
|
except MessageNotModified:
|
|
pass
|
|
else:
|
|
try:
|
|
await message.edit("{}\n{}".format(type_of_ps, tmp))
|
|
except FloodWait as e:
|
|
await asyncio.sleep(e.x)
|
|
except MessageNotModified:
|
|
pass
|
|
|
|
|
|
def get_user(message: Message, text: str) -> [int, str, None]:
|
|
if text is None:
|
|
asplit = None
|
|
else:
|
|
asplit = text.split(" ", 1)
|
|
user_s = None
|
|
reason_ = None
|
|
if message.reply_to_message:
|
|
user_s = message.reply_to_message.from_user.id
|
|
reason_ = text if text else None
|
|
elif asplit is None:
|
|
return None, None
|
|
elif len(asplit[0]) > 0:
|
|
user_s = int(asplit[0]) if asplit[0].isdigit() else asplit[0]
|
|
if len(asplit) == 2:
|
|
reason_ = asplit[1]
|
|
return user_s, reason_
|
|
|
|
|
|
def get_readable_time(seconds: int) -> str:
|
|
count = 0
|
|
ping_time = ""
|
|
time_list = []
|
|
time_suffix_list = ["s", "m", "h", "days"]
|
|
|
|
while count < 4:
|
|
count += 1
|
|
if count < 3:
|
|
remainder, result = divmod(seconds, 60)
|
|
else:
|
|
remainder, result = divmod(seconds, 24)
|
|
if seconds == 0 and remainder == 0:
|
|
break
|
|
time_list.append(int(result))
|
|
seconds = int(remainder)
|
|
|
|
for x in range(len(time_list)):
|
|
time_list[x] = str(time_list[x]) + time_suffix_list[x]
|
|
if len(time_list) == 4:
|
|
ping_time += time_list.pop() + ", "
|
|
|
|
time_list.reverse()
|
|
ping_time += ":".join(time_list)
|
|
|
|
return ping_time
|
|
|
|
|
|
def time_formatter(milliseconds: int) -> str:
|
|
seconds, milliseconds = divmod(int(milliseconds), 1000)
|
|
minutes, seconds = divmod(seconds, 60)
|
|
hours, minutes = divmod(minutes, 60)
|
|
days, hours = divmod(hours, 24)
|
|
tmp = (
|
|
((str(days) + " day(s), ") if days else "")
|
|
+ ((str(hours) + " hour(s), ") if hours else "")
|
|
+ ((str(minutes) + " minute(s), ") if minutes else "")
|
|
+ ((str(seconds) + " second(s), ") if seconds else "")
|
|
+ ((str(milliseconds) + " millisecond(s), ") if milliseconds else "")
|
|
)
|
|
return tmp[:-2]
|
|
|
|
|
|
ydl_opts = {
|
|
"format": "bestaudio/best",
|
|
"writethumbnail": True,
|
|
"postprocessors": [
|
|
{
|
|
"key": "FFmpegExtractAudio",
|
|
"preferredcodec": "mp3",
|
|
"preferredquality": "192",
|
|
}
|
|
],
|
|
}
|
|
|
|
|
|
def get_file_extension_from_url(url):
|
|
url_path = urlparse(url).path
|
|
basename = os.path.basename(url_path)
|
|
return basename.split(".")[-1]
|
|
|
|
|
|
async def download_song(url):
|
|
song_name = f"{randint(6969, 6999)}.mp3"
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as resp:
|
|
if resp.status == 200:
|
|
f = await aiofiles.open(song_name, mode="wb")
|
|
await f.write(await resp.read())
|
|
await f.close()
|
|
return song_name
|
|
|
|
|
|
is_downloading = False
|
|
|
|
|
|
def time_to_seconds(times):
|
|
stringt = str(times)
|
|
return sum(int(x) * 60 ** i for i, x in enumerate(reversed(stringt.split(":"))))
|
|
|
|
|
|
@Client.on_message(command(["vsong", f"vsong@{BOT_USERNAME}"]) & filters.group & ~filters.edited)
|
|
async def vsong(_, message: Message):
|
|
query = ''
|
|
for i in message.command[1:]:
|
|
query += ' ' + str(i)
|
|
print(query)
|
|
k = await message.reply_text("🔎 **searching video...**")
|
|
ydl_ops = {
|
|
"format": "best[ext=mp4]",
|
|
"geo-bypass": True,
|
|
"nocheckcertificate": True,
|
|
"outtmpl": "downloads/%(id)s.%(ext)s",
|
|
}
|
|
try:
|
|
results = []
|
|
count = 0
|
|
while len(results) == 0 and count < 6:
|
|
if count > 0:
|
|
await asyncio.sleep(1)
|
|
results = YoutubeSearch(query, max_results=1).to_dict()
|
|
count += 1
|
|
try:
|
|
link = f"https://youtube.com{results[0]['url_suffix']}"
|
|
# print(results)
|
|
title = results[0]["title"]
|
|
thumbnail = results[0]["thumbnails"][0]
|
|
duration = int(float(results[0]["duration"]))
|
|
views = results[0]["views"]
|
|
thumb_name = f'thumb{message.message_id}.jpg'
|
|
thumb = requests.get(thumbnail, allow_redirects=True)
|
|
open(thumb_name, 'wb').write(thumb.content)
|
|
except Exception as e:
|
|
print(e)
|
|
await k.edit(
|
|
'❌ **video not found, please give a valid video name.\n\n» if you think this is an error report to '
|
|
'@VeezSupportGroup**')
|
|
return
|
|
except Exception as e:
|
|
await k.edit(
|
|
"💡 **please give a video name too you want to download.**\n\n» for example: `/vsong runaway`"
|
|
)
|
|
print(str(e))
|
|
return
|
|
await k.edit("📥 **downloading file...**")
|
|
try:
|
|
with youtube_dl.YoutubeDL(ydl_ops) as ydl:
|
|
info_dict = ydl.extract_info(link, download=False)
|
|
video_file = ydl.prepare_filename(info_dict)
|
|
ydl.process_info(info_dict)
|
|
caption = f"🏷 Name: {title}\n💡 Views: `{views}`\n🎧 Request by: {message.from_user.mention()}\n\n⚡ " \
|
|
f"__Powered by Veez Music A.I__ "
|
|
buttons = InlineKeyboardMarkup([[InlineKeyboardButton("🗑 Close", callback_data="cls")]])
|
|
await k.edit("📤 **uploading file...**")
|
|
await message.reply_video(video_file, caption=caption, duration=duration, thumb=thumb_name,
|
|
reply_markup=buttons, supports_streaming=True)
|
|
await k.delete()
|
|
except Exception as e:
|
|
await k.edit(f'❌ **something went wrong !** \n`{e}`')
|
|
pass
|
|
try:
|
|
os.remove(video_file)
|
|
os.remove(thumb_name)
|
|
except Exception as e:
|
|
print(e)
|
|
pass
|