# Copyright (C) 2021 By Veez Music-Project from __future__ import unicode_literals import asyncio import math import os import time from random import randint from urllib.parse import urlparse import aiofiles import aiohttp import requests import wget import yt_dlp from pyrogram import Client, filters from pyrogram.errors import FloodWait, MessageNotModified from pyrogram.types import Message from youtube_search import YoutubeSearch from yt_dlp import YoutubeDL from config import BOT_USERNAME as bn from driver.decorators import humanbytes from driver.filters import command, other_filters ydl_opts = { 'format': 'best', 'keepvideo': True, 'prefer_ffmpeg': False, 'geo_bypass': True, 'outtmpl': '%(title)s.%(ext)s', 'quite': True } @Client.on_message(command(["song", f"song@{bn}"]) & ~filters.edited) def song(_, message): query = " ".join(message.command[1:]) m = message.reply("๐Ÿ”Ž finding song...") ydl_ops = {"format": "bestaudio[ext=m4a]"} try: results = YoutubeSearch(query, max_results=1).to_dict() link = f"https://youtube.com{results[0]['url_suffix']}" title = results[0]["title"][:40] thumbnail = results[0]["thumbnails"][0] thumb_name = f"{title}.jpg" thumb = requests.get(thumbnail, allow_redirects=True) open(thumb_name, "wb").write(thumb.content) duration = results[0]["duration"] except Exception as e: m.edit("โŒ song not found.\n\nplease give a valid song name.") print(str(e)) return m.edit("๐Ÿ“ฅ downloading file...") try: with yt_dlp.YoutubeDL(ydl_ops) as ydl: info_dict = ydl.extract_info(link, download=False) audio_file = ydl.prepare_filename(info_dict) ydl.process_info(info_dict) rep = f"**๐ŸŽง Uploader @{bn}**" secmul, dur, dur_arr = 1, 0, duration.split(":") for i in range(len(dur_arr) - 1, -1, -1): dur += int(float(dur_arr[i])) * secmul secmul *= 60 m.edit("๐Ÿ“ค uploading file...") message.reply_audio( audio_file, caption=rep, thumb=thumb_name, parse_mode="md", title=title, duration=dur, ) m.delete() except Exception as e: m.edit("โŒ error, wait for bot owner to fix") print(e) try: os.remove(audio_file) os.remove(thumb_name) except Exception as e: print(e) def get_readable_time(seconds: int) -> str: count = 0 ping_time = "" time_list = [] time_suffix_list = ["s", "m", "h", "days"] while count < 4: count += 1 remainder, result = divmod( seconds, 60) if count < 3 else divmod( seconds, 24) if seconds == 0 and remainder == 0: break time_list.append(int(result)) seconds = int(remainder) for x in range(len(time_list)): time_list[x] = str(time_list[x]) + time_suffix_list[x] if len(time_list) == 4: ping_time += time_list.pop() + ", " time_list.reverse() ping_time += ":".join(time_list) return ping_time def time_formatter(milliseconds: int) -> str: seconds, milliseconds = divmod(int(milliseconds), 1000) minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) days, hours = divmod(hours, 24) tmp = ( ((str(days) + " day(s), ") if days else "") + ((str(hours) + " hour(s), ") if hours else "") + ((str(minutes) + " minute(s), ") if minutes else "") + ((str(seconds) + " second(s), ") if seconds else "") + ((str(milliseconds) + " millisecond(s), ") if milliseconds else "") ) return tmp[:-2] def get_file_extension_from_url(url): url_path = urlparse(url).path basename = os.path.basename(url_path) return basename.split(".")[-1] async def download_song(url): song_name = f"{randint(6969, 6999)}.mp3" async with aiohttp.ClientSession() as session: async with session.get(url) as resp: if resp.status == 200: f = await aiofiles.open(song_name, mode="wb") await f.write(await resp.read()) await f.close() return song_name def time_to_seconds(times): stringt = str(times) return sum( int(x) * 60 ** i for i, x in enumerate( reversed( stringt.split(":")))) @Client.on_message( command(["vsong", f"vsong@{bn}", "video", f"video@{bn}"]) & ~filters.edited ) async def vsong(client, message): ydl_opts = { "format": "best", "keepvideo": True, "prefer_ffmpeg": False, "geo_bypass": True, "outtmpl": "%(title)s.%(ext)s", "quite": True, } query = " ".join(message.command[1:]) try: results = YoutubeSearch(query, max_results=1).to_dict() link = f"https://youtube.com{results[0]['url_suffix']}" title = results[0]["title"][:40] thumbnail = results[0]["thumbnails"][0] thumb_name = f"{title}.jpg" thumb = requests.get(thumbnail, allow_redirects=True) open(thumb_name, "wb").write(thumb.content) results[0]["duration"] results[0]["url_suffix"] results[0]["views"] message.from_user.mention except Exception as e: print(e) try: msg = await message.reply("๐Ÿ“ฅ **downloading video...**") with YoutubeDL(ydl_opts) as ytdl: ytdl_data = ytdl.extract_info(link, download=True) file_name = ytdl.prepare_filename(ytdl_data) except Exception as e: return await msg.edit(f"๐Ÿšซ **error:** {e}") preview = wget.download(thumbnail) await msg.edit("๐Ÿ“ค **uploading video...**") await message.reply_video( file_name, duration=int(ytdl_data["duration"]), thumb=preview, caption=ytdl_data["title"], ) try: os.remove(file_name) await msg.delete() except Exception as e: print(e) @Client.on_message(command(["lyric", f"lyric@{bn}"])) async def lyrics(_, message): try: if len(message.command) < 2: await message.reply_text("ยป **give a lyric name too.**") return query = message.text.split(None, 1)[1] rep = await message.reply_text("๐Ÿ”Ž **searching lyrics...**") resp = requests.get( f"https://api-tede.herokuapp.com/api/lirik?l={query}" ).json() result = f"{resp['data']}" await rep.edit(result) except Exception: await rep.edit("โŒ **results of lyric not found.**\n\nยป **please give a valid song name.**")