195 lines
6.1 KiB
Python
195 lines
6.1 KiB
Python
# Copyright (C) 2021 By Veez Music-Project
|
|
|
|
import os
|
|
import re
|
|
import time
|
|
import asyncio
|
|
import lyricsgenius
|
|
|
|
import aiofiles
|
|
import aiohttp
|
|
import requests
|
|
import wget
|
|
import yt_dlp
|
|
from pyrogram import Client, filters
|
|
from pyrogram.errors import FloodWait, MessageNotModified
|
|
from pyrogram.types import Message
|
|
from youtube_search import YoutubeSearch
|
|
from youtubesearchpython import VideosSearch
|
|
from yt_dlp import YoutubeDL
|
|
|
|
from config import BOT_USERNAME as bn
|
|
from driver.filters import command, other_filters
|
|
from driver.database.dbpunish import is_gbanned_user
|
|
|
|
|
|
is_downloading = False
|
|
|
|
|
|
@Client.on_message(command(["song", f"song@{bn}"]) & ~filters.edited)
|
|
def song_downloader(_, message):
|
|
global is_downloading
|
|
user_id = message.from_user.id
|
|
if is_gbanned_user(user_id):
|
|
message.reply("❗️ **You've blocked from using this bot!**")
|
|
return
|
|
query = " ".join(message.command[1:])
|
|
if is_downloading:
|
|
message.reply(
|
|
"» Other download is in progress, please try again after some time !"
|
|
)
|
|
return
|
|
is_downloading = True
|
|
m = message.reply("🔎 finding song...")
|
|
ydl_ops = {
|
|
'format': 'bestaudio[ext=m4a]',
|
|
'keepvideo': True,
|
|
'prefer_ffmpeg': False,
|
|
'geo_bypass': True,
|
|
'outtmpl': '%(title)s.%(ext)s',
|
|
'quite': True,
|
|
}
|
|
try:
|
|
results = YoutubeSearch(query, max_results=1).to_dict()
|
|
link = f"https://youtube.com{results[0]['url_suffix']}"
|
|
title = results[0]["title"][:40]
|
|
thumbnail = results[0]["thumbnails"][0]
|
|
thumb_name = f"{title}.jpg"
|
|
thumb = requests.get(thumbnail, allow_redirects=True)
|
|
open(thumb_name, "wb").write(thumb.content)
|
|
duration = results[0]["duration"]
|
|
|
|
except Exception as e:
|
|
m.edit("❌ song not found.\n\n» Give me a valid song name !")
|
|
print(str(e))
|
|
return
|
|
m.edit("📥 downloading song...")
|
|
try:
|
|
with yt_dlp.YoutubeDL(ydl_ops) as ydl:
|
|
info_dict = ydl.extract_info(link, download=False)
|
|
audio_file = ydl.prepare_filename(info_dict)
|
|
ydl.process_info(info_dict)
|
|
rep = f"• uploader @{bn}"
|
|
host = str(info_dict["uploader"])
|
|
secmul, dur, dur_arr = 1, 0, duration.split(":")
|
|
for i in range(len(dur_arr) - 1, -1, -1):
|
|
dur += int(float(dur_arr[i])) * secmul
|
|
secmul *= 60
|
|
m.edit("📤 uploading song...")
|
|
message.reply_audio(
|
|
audio_file,
|
|
caption=rep,
|
|
performer=host,
|
|
thumb=thumb_name,
|
|
parse_mode="md",
|
|
title=title,
|
|
duration=dur,
|
|
)
|
|
m.delete()
|
|
is_downloading = False
|
|
except Exception as e:
|
|
m.edit("❌ error, wait for bot owner to fix")
|
|
print(e)
|
|
|
|
try:
|
|
os.remove(audio_file)
|
|
os.remove(thumb_name)
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
|
|
@Client.on_message(
|
|
command(["vsong", f"vsong@{bn}", "video", f"video@{bn}"]) & ~filters.edited
|
|
)
|
|
async def video_downloader(_, message):
|
|
global is_downloading
|
|
user_id = message.from_user.id
|
|
if await is_gbanned_user(user_id):
|
|
await message.reply_text("❗️ **You've blocked from using this bot!**")
|
|
return
|
|
ydl_opts = {
|
|
"format": "best",
|
|
"keepvideo": True,
|
|
"prefer_ffmpeg": False,
|
|
"geo_bypass": True,
|
|
"outtmpl": "%(title)s.%(ext)s",
|
|
"quite": True,
|
|
}
|
|
query = " ".join(message.command[1:])
|
|
if is_downloading:
|
|
return await message.reply(
|
|
"» Other download is in progress, please try again after some time !"
|
|
)
|
|
is_downloading = True
|
|
try:
|
|
results = YoutubeSearch(query, max_results=1).to_dict()
|
|
link = f"https://youtube.com{results[0]['url_suffix']}"
|
|
title = results[0]["title"][:40]
|
|
thumbnail = results[0]["thumbnails"][0]
|
|
thumb_name = f"{title}.jpg"
|
|
thumb = requests.get(thumbnail, allow_redirects=True)
|
|
open(thumb_name, "wb").write(thumb.content)
|
|
results[0]["duration"]
|
|
results[0]["url_suffix"]
|
|
results[0]["views"]
|
|
message.from_user.mention
|
|
except Exception as e:
|
|
print(e)
|
|
try:
|
|
msg = await message.reply("📥 downloading video...")
|
|
with YoutubeDL(ydl_opts) as ytdl:
|
|
ytdl_data = ytdl.extract_info(link, download=True)
|
|
file_name = ytdl.prepare_filename(ytdl_data)
|
|
except Exception as e:
|
|
return await msg.edit(f"🚫 error: `{e}`")
|
|
preview = wget.download(thumbnail)
|
|
await msg.edit("📤 uploading video...")
|
|
await message.reply_video(
|
|
file_name,
|
|
duration=int(ytdl_data["duration"]),
|
|
thumb=preview,
|
|
caption=ytdl_data["title"],
|
|
)
|
|
is_downloading = False
|
|
try:
|
|
os.remove(file_name)
|
|
await msg.delete()
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
|
|
@Client.on_message(command(["lyric", f"lyric@{bn}", "lyrics"]))
|
|
async def get_lyric_genius(_, message: Message):
|
|
user_id = message.from_user.id
|
|
if await is_gbanned_user(user_id):
|
|
await message.reply_text("❗️ **You've blocked from using this bot!**")
|
|
return
|
|
if len(message.command) < 2:
|
|
return await message.reply_text("**usage:**\n\n/lyrics (song name)")
|
|
m = await message.reply_text("🔍 Searching lyrics...")
|
|
query = message.text.split(None, 1)[1]
|
|
x = "OXaVabSRKQLqwpiYOn-E4Y7k3wj-TNdL5RfDPXlnXhCErbcqVvdCF-WnMR5TBctI"
|
|
y = lyricsgenius.Genius(x)
|
|
y.verbose = False
|
|
S = y.search_song(query, get_full_info=False)
|
|
if S is None:
|
|
return await m.edit("❌ `404` lyrics not found")
|
|
xxx = f"""
|
|
**Song Name:** __{query}__
|
|
**Artist Name:** {S.artist}
|
|
**__Lyrics:__**
|
|
{S.lyrics}"""
|
|
if len(xxx) > 4096:
|
|
await m.delete()
|
|
filename = "lyrics.txt"
|
|
with open(filename, "w+", encoding="utf8") as out_file:
|
|
out_file.write(str(xxx.strip()))
|
|
await message.reply_document(
|
|
document=filename,
|
|
caption=f"**OUTPUT:**\n\n`Lyrics Text`",
|
|
quote=False,
|
|
)
|
|
os.remove(filename)
|
|
else:
|
|
await m.edit(xxx)
|