video-stream/program/downloader.py

190 lines
5.7 KiB
Python
Raw Permalink Normal View History

2022-01-31 12:41:47 +00:00
# Copyright (C) 2021 By Veez Music-Project
from __future__ import unicode_literals
import os
import re
import math
import time
import asyncio
import lyricsgenius
from random import randint
from urllib.parse import urlparse
import aiofiles
import aiohttp
import requests
import wget
import yt_dlp
from pyrogram import Client, filters
from pyrogram.errors import FloodWait, MessageNotModified
from pyrogram.types import Message
from youtube_search import YoutubeSearch
from youtubesearchpython import VideosSearch
from yt_dlp import YoutubeDL
from config import BOT_USERNAME as bn
from driver.decorators import humanbytes
from driver.filters import command, other_filters
ydl_opts = {
'format': 'best',
'keepvideo': True,
'prefer_ffmpeg': False,
'geo_bypass': True,
'outtmpl': '%(title)s.%(ext)s',
'quite': True
}
2022-02-01 09:31:45 +00:00
is_downloading = False
2022-01-31 12:41:47 +00:00
@Client.on_message(command(["song", f"song@{bn}"]) & ~filters.edited)
def song(_, message):
2022-02-01 09:31:45 +00:00
global is_downloading
2022-01-31 12:41:47 +00:00
query = " ".join(message.command[1:])
2022-02-01 09:31:45 +00:00
if is_downloading:
2022-02-01 10:05:57 +00:00
message.reply(
2022-02-01 14:30:23 +00:00
"» Other download is in progress, please try again after some time !"
2022-02-01 09:31:45 +00:00
)
2022-02-01 10:05:57 +00:00
return
2022-02-01 09:31:45 +00:00
is_downloading = True
2022-01-31 12:41:47 +00:00
m = message.reply("🔎 finding song...")
ydl_ops = {"format": "bestaudio[ext=m4a]"}
try:
results = YoutubeSearch(query, max_results=1).to_dict()
link = f"https://youtube.com{results[0]['url_suffix']}"
title = results[0]["title"][:40]
thumbnail = results[0]["thumbnails"][0]
thumb_name = f"{title}.jpg"
thumb = requests.get(thumbnail, allow_redirects=True)
open(thumb_name, "wb").write(thumb.content)
duration = results[0]["duration"]
except Exception as e:
2022-02-01 10:05:57 +00:00
m.edit("❌ song not found.\n\nplease give a valid song name !")
2022-01-31 12:41:47 +00:00
print(str(e))
return
2022-02-01 09:31:45 +00:00
m.edit("📥 downloading song...")
2022-01-31 12:41:47 +00:00
try:
with yt_dlp.YoutubeDL(ydl_ops) as ydl:
info_dict = ydl.extract_info(link, download=False)
audio_file = ydl.prepare_filename(info_dict)
ydl.process_info(info_dict)
rep = f"• uploader @{bn}"
2022-02-01 09:58:05 +00:00
host = str(info_dict["uploader"])
2022-01-31 12:41:47 +00:00
secmul, dur, dur_arr = 1, 0, duration.split(":")
for i in range(len(dur_arr) - 1, -1, -1):
dur += int(float(dur_arr[i])) * secmul
secmul *= 60
2022-02-01 09:31:45 +00:00
m.edit("📤 uploading song...")
2022-01-31 12:41:47 +00:00
message.reply_audio(
audio_file,
caption=rep,
2022-02-01 09:58:05 +00:00
performer=host,
2022-01-31 12:41:47 +00:00
thumb=thumb_name,
parse_mode="md",
title=title,
duration=dur,
)
m.delete()
2022-02-01 09:31:45 +00:00
is_downloading = False
2022-01-31 12:41:47 +00:00
except Exception as e:
m.edit("❌ error, wait for bot owner to fix")
print(e)
try:
os.remove(audio_file)
os.remove(thumb_name)
except Exception as e:
print(e)
@Client.on_message(
command(["vsong", f"vsong@{bn}", "video", f"video@{bn}"]) & ~filters.edited
)
async def vsong(client, message):
2022-02-01 09:31:45 +00:00
global is_downloading
2022-01-31 12:41:47 +00:00
ydl_opts = {
"format": "best",
"keepvideo": True,
"prefer_ffmpeg": False,
"geo_bypass": True,
"outtmpl": "%(title)s.%(ext)s",
"quite": True,
}
query = " ".join(message.command[1:])
2022-02-01 09:31:45 +00:00
if is_downloading:
return await message.reply(
2022-02-01 14:54:30 +00:00
"» Other download is in progress, please try again after some time !"
2022-02-01 09:31:45 +00:00
)
is_downloading = True
2022-01-31 12:41:47 +00:00
try:
results = YoutubeSearch(query, max_results=1).to_dict()
link = f"https://youtube.com{results[0]['url_suffix']}"
title = results[0]["title"][:40]
thumbnail = results[0]["thumbnails"][0]
thumb_name = f"{title}.jpg"
thumb = requests.get(thumbnail, allow_redirects=True)
open(thumb_name, "wb").write(thumb.content)
results[0]["duration"]
results[0]["url_suffix"]
results[0]["views"]
message.from_user.mention
except Exception as e:
print(e)
try:
2022-02-01 09:31:45 +00:00
msg = await message.reply("📥 downloading video...")
2022-01-31 12:41:47 +00:00
with YoutubeDL(ydl_opts) as ytdl:
ytdl_data = ytdl.extract_info(link, download=True)
file_name = ytdl.prepare_filename(ytdl_data)
except Exception as e:
2022-02-01 09:31:45 +00:00
return await msg.edit(f"🚫 error: `{e}`")
2022-01-31 12:41:47 +00:00
preview = wget.download(thumbnail)
2022-02-01 09:31:45 +00:00
await msg.edit("📤 uploading video...")
2022-01-31 12:41:47 +00:00
await message.reply_video(
file_name,
duration=int(ytdl_data["duration"]),
thumb=preview,
caption=ytdl_data["title"],
)
2022-02-01 09:31:45 +00:00
is_downloading = False
2022-01-31 12:41:47 +00:00
try:
os.remove(file_name)
await msg.delete()
except Exception as e:
print(e)
@Client.on_message(command(["lyric", f"lyric@{bn}", "lyrics"]))
async def get_lyric_genius(_, message: Message):
if len(message.command) < 2:
return await message.reply_text("**usage:**\n\n/lyrics (song name)")
m = await message.reply_text("🔍 Searching lyrics...")
query = message.text.split(None, 1)[1]
x = "OXaVabSRKQLqwpiYOn-E4Y7k3wj-TNdL5RfDPXlnXhCErbcqVvdCF-WnMR5TBctI"
y = lyricsgenius.Genius(x)
y.verbose = False
S = y.search_song(query, get_full_info=False)
if S is None:
return await m.edit("❌ `404` lyrics not found")
xxx = f"""
**Song Name:** __{query}__
**Artist Name:** {S.artist}
**__Lyrics:__**
{S.lyrics}"""
if len(xxx) > 4096:
await m.delete()
filename = "lyrics.txt"
with open(filename, "w+", encoding="utf8") as out_file:
out_file.write(str(xxx.strip()))
await message.reply_document(
document=filename,
caption=f"**OUTPUT:**\n\n`Lyrics Text`",
quote=False,
)
os.remove(filename)
else:
await m.edit(xxx)