mirror of
https://github.com/omg-xtao/ytdlbot.git
synced 2024-11-16 03:45:23 +00:00
use new wrapper (#10)
This commit is contained in:
parent
9b60d20062
commit
5899216a00
2
.gitignore
vendored
2
.gitignore
vendored
@ -136,3 +136,5 @@ dmypy.json
|
||||
/.idea/misc.xml
|
||||
/.idea/workspace.xml
|
||||
/.idea/jsonSchemas.xml
|
||||
/demo.session
|
||||
/.idea/ytdlbot.iml
|
||||
|
8
.idea/.gitignore
vendored
Normal file
8
.idea/.gitignore
vendored
Normal file
@ -0,0 +1,8 @@
|
||||
# Default ignored files
|
||||
/shelf/
|
||||
/workspace.xml
|
||||
# Datasource local storage ignored files
|
||||
/dataSources/
|
||||
/dataSources.local.xml
|
||||
# Editor-based HTTP Client requests
|
||||
/httpRequests/
|
@ -10,8 +10,8 @@ RUN apk update && apk add --no-cache ffmpeg
|
||||
COPY --from=builder /root/.local /usr/local
|
||||
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
|
||||
COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo
|
||||
COPY . /ytdl-bot
|
||||
COPY . /ytdlbot
|
||||
|
||||
WORKDIR /ytdl-bot
|
||||
WORKDIR /ytdlbot
|
||||
ENV TZ=Asia/Shanghai
|
||||
CMD ["python", "ytdl.py"]
|
309
FastTelethon.py
309
FastTelethon.py
@ -1,309 +0,0 @@
|
||||
# submodule: https://gist.github.com/painor/7e74de80ae0c819d3e9abcf9989a8dd6/a98abd5ff5cae3640e785611a38c0e213df56343
|
||||
# copied from https://github.com/tulir/mautrix-telegram/blob/master/mautrix_telegram/util/parallel_file_transfer.py
|
||||
# Copyright (C) 2021 Tulir Asokan
|
||||
import asyncio
|
||||
import hashlib
|
||||
import inspect
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
from collections import defaultdict
|
||||
from typing import Optional, List, AsyncGenerator, Union, Awaitable, DefaultDict, Tuple, BinaryIO
|
||||
|
||||
from telethon import utils, helpers, TelegramClient
|
||||
from telethon.crypto import AuthKey
|
||||
from telethon.network import MTProtoSender
|
||||
from telethon.tl.alltlobjects import LAYER
|
||||
from telethon.tl.functions import InvokeWithLayerRequest
|
||||
from telethon.tl.functions.auth import ExportAuthorizationRequest, ImportAuthorizationRequest
|
||||
from telethon.tl.functions.upload import (GetFileRequest, SaveFilePartRequest,
|
||||
SaveBigFilePartRequest)
|
||||
from telethon.tl.types import (Document, InputFileLocation, InputDocumentFileLocation,
|
||||
InputPhotoFileLocation, InputPeerPhotoFileLocation, TypeInputFile,
|
||||
InputFileBig, InputFile)
|
||||
|
||||
try:
|
||||
from mautrix.crypto.attachments import async_encrypt_attachment
|
||||
except ImportError:
|
||||
async_encrypt_attachment = None
|
||||
|
||||
log: logging.Logger = logging.getLogger("telethon")
|
||||
|
||||
TypeLocation = Union[Document, InputDocumentFileLocation, InputPeerPhotoFileLocation,
|
||||
InputFileLocation, InputPhotoFileLocation]
|
||||
|
||||
|
||||
class DownloadSender:
|
||||
client: TelegramClient
|
||||
sender: MTProtoSender
|
||||
request: GetFileRequest
|
||||
remaining: int
|
||||
stride: int
|
||||
|
||||
def __init__(self, client: TelegramClient, sender: MTProtoSender, file: TypeLocation, offset: int, limit: int,
|
||||
stride: int, count: int) -> None:
|
||||
self.sender = sender
|
||||
self.client = client
|
||||
self.request = GetFileRequest(file, offset=offset, limit=limit)
|
||||
self.stride = stride
|
||||
self.remaining = count
|
||||
|
||||
async def next(self) -> Optional[bytes]:
|
||||
if not self.remaining:
|
||||
return None
|
||||
result = await self.client._call(self.sender, self.request)
|
||||
self.remaining -= 1
|
||||
self.request.offset += self.stride
|
||||
return result.bytes
|
||||
|
||||
def disconnect(self) -> Awaitable[None]:
|
||||
return self.sender.disconnect()
|
||||
|
||||
|
||||
class UploadSender:
|
||||
client: TelegramClient
|
||||
sender: MTProtoSender
|
||||
request: Union[SaveFilePartRequest, SaveBigFilePartRequest]
|
||||
part_count: int
|
||||
stride: int
|
||||
previous: Optional[asyncio.Task]
|
||||
loop: asyncio.AbstractEventLoop
|
||||
|
||||
def __init__(self, client: TelegramClient, sender: MTProtoSender, file_id: int, part_count: int, big: bool,
|
||||
index: int,
|
||||
stride: int, loop: asyncio.AbstractEventLoop) -> None:
|
||||
self.client = client
|
||||
self.sender = sender
|
||||
self.part_count = part_count
|
||||
if big:
|
||||
self.request = SaveBigFilePartRequest(file_id, index, part_count, b"")
|
||||
else:
|
||||
self.request = SaveFilePartRequest(file_id, index, b"")
|
||||
self.stride = stride
|
||||
self.previous = None
|
||||
self.loop = loop
|
||||
|
||||
async def next(self, data: bytes) -> None:
|
||||
if self.previous:
|
||||
await self.previous
|
||||
self.previous = self.loop.create_task(self._next(data))
|
||||
|
||||
async def _next(self, data: bytes) -> None:
|
||||
self.request.bytes = data
|
||||
log.debug(f"Sending file part {self.request.file_part}/{self.part_count}"
|
||||
f" with {len(data)} bytes")
|
||||
await self.client._call(self.sender, self.request)
|
||||
self.request.file_part += self.stride
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
if self.previous:
|
||||
await self.previous
|
||||
return await self.sender.disconnect()
|
||||
|
||||
|
||||
class ParallelTransferrer:
|
||||
client: TelegramClient
|
||||
loop: asyncio.AbstractEventLoop
|
||||
dc_id: int
|
||||
senders: Optional[List[Union[DownloadSender, UploadSender]]]
|
||||
auth_key: AuthKey
|
||||
upload_ticker: int
|
||||
|
||||
def __init__(self, client: TelegramClient, dc_id: Optional[int] = None) -> None:
|
||||
self.client = client
|
||||
self.loop = self.client.loop
|
||||
self.dc_id = dc_id or self.client.session.dc_id
|
||||
self.auth_key = (None if dc_id and self.client.session.dc_id != dc_id
|
||||
else self.client.session.auth_key)
|
||||
self.senders = None
|
||||
self.upload_ticker = 0
|
||||
|
||||
async def _cleanup(self) -> None:
|
||||
await asyncio.gather(*[sender.disconnect() for sender in self.senders])
|
||||
self.senders = None
|
||||
|
||||
@staticmethod
|
||||
def _get_connection_count(file_size: int, max_count: int = 20,
|
||||
full_size: int = 100 * 1024 * 1024) -> int:
|
||||
if file_size > full_size:
|
||||
return max_count
|
||||
return math.ceil((file_size / full_size) * max_count)
|
||||
|
||||
async def _init_download(self, connections: int, file: TypeLocation, part_count: int,
|
||||
part_size: int) -> None:
|
||||
minimum, remainder = divmod(part_count, connections)
|
||||
|
||||
def get_part_count() -> int:
|
||||
nonlocal remainder
|
||||
if remainder > 0:
|
||||
remainder -= 1
|
||||
return minimum + 1
|
||||
return minimum
|
||||
|
||||
# The first cross-DC sender will export+import the authorization, so we always create it
|
||||
# before creating any other senders.
|
||||
self.senders = [
|
||||
await self._create_download_sender(file, 0, part_size, connections * part_size,
|
||||
get_part_count()),
|
||||
*await asyncio.gather(
|
||||
*[self._create_download_sender(file, i, part_size, connections * part_size,
|
||||
get_part_count())
|
||||
for i in range(1, connections)])
|
||||
]
|
||||
|
||||
async def _create_download_sender(self, file: TypeLocation, index: int, part_size: int,
|
||||
stride: int,
|
||||
part_count: int) -> DownloadSender:
|
||||
return DownloadSender(self.client, await self._create_sender(), file, index * part_size, part_size,
|
||||
stride, part_count)
|
||||
|
||||
async def _init_upload(self, connections: int, file_id: int, part_count: int, big: bool
|
||||
) -> None:
|
||||
self.senders = [
|
||||
await self._create_upload_sender(file_id, part_count, big, 0, connections),
|
||||
*await asyncio.gather(
|
||||
*[self._create_upload_sender(file_id, part_count, big, i, connections)
|
||||
for i in range(1, connections)])
|
||||
]
|
||||
|
||||
async def _create_upload_sender(self, file_id: int, part_count: int, big: bool, index: int,
|
||||
stride: int) -> UploadSender:
|
||||
return UploadSender(self.client, await self._create_sender(), file_id, part_count, big, index, stride,
|
||||
loop=self.loop)
|
||||
|
||||
async def _create_sender(self) -> MTProtoSender:
|
||||
dc = await self.client._get_dc(self.dc_id)
|
||||
sender = MTProtoSender(self.auth_key, loggers=self.client._log)
|
||||
await sender.connect(self.client._connection(dc.ip_address, dc.port, dc.id,
|
||||
loggers=self.client._log,
|
||||
proxy=self.client._proxy))
|
||||
if not self.auth_key:
|
||||
log.debug(f"Exporting auth to DC {self.dc_id}")
|
||||
auth = await self.client(ExportAuthorizationRequest(self.dc_id))
|
||||
self.client._init_request.query = ImportAuthorizationRequest(id=auth.id,
|
||||
bytes=auth.bytes)
|
||||
req = InvokeWithLayerRequest(LAYER, self.client._init_request)
|
||||
await sender.send(req)
|
||||
self.auth_key = sender.auth_key
|
||||
return sender
|
||||
|
||||
async def init_upload(self, file_id: int, file_size: int, part_size_kb: Optional[float] = None,
|
||||
connection_count: Optional[int] = None) -> Tuple[int, int, bool]:
|
||||
connection_count = connection_count or self._get_connection_count(file_size)
|
||||
part_size = (part_size_kb or utils.get_appropriated_part_size(file_size)) * 1024
|
||||
part_count = (file_size + part_size - 1) // part_size
|
||||
is_large = file_size > 10 * 1024 * 1024
|
||||
await self._init_upload(connection_count, file_id, part_count, is_large)
|
||||
return part_size, part_count, is_large
|
||||
|
||||
async def upload(self, part: bytes) -> None:
|
||||
await self.senders[self.upload_ticker].next(part)
|
||||
self.upload_ticker = (self.upload_ticker + 1) % len(self.senders)
|
||||
|
||||
async def finish_upload(self) -> None:
|
||||
await self._cleanup()
|
||||
|
||||
async def download(self, file: TypeLocation, file_size: int,
|
||||
part_size_kb: Optional[float] = None,
|
||||
connection_count: Optional[int] = None) -> AsyncGenerator[bytes, None]:
|
||||
connection_count = connection_count or self._get_connection_count(file_size)
|
||||
part_size = (part_size_kb or utils.get_appropriated_part_size(file_size)) * 1024
|
||||
part_count = math.ceil(file_size / part_size)
|
||||
log.debug("Starting parallel download: "
|
||||
f"{connection_count} {part_size} {part_count} {file!s}")
|
||||
await self._init_download(connection_count, file, part_count, part_size)
|
||||
|
||||
part = 0
|
||||
while part < part_count:
|
||||
tasks = []
|
||||
for sender in self.senders:
|
||||
tasks.append(self.loop.create_task(sender.next()))
|
||||
for task in tasks:
|
||||
data = await task
|
||||
if not data:
|
||||
break
|
||||
yield data
|
||||
part += 1
|
||||
log.debug(f"Part {part} downloaded")
|
||||
|
||||
log.debug("Parallel download finished, cleaning up connections")
|
||||
await self._cleanup()
|
||||
|
||||
|
||||
parallel_transfer_locks: DefaultDict[int, asyncio.Lock] = defaultdict(lambda: asyncio.Lock())
|
||||
|
||||
|
||||
def stream_file(file_to_stream: BinaryIO, chunk_size=1024):
|
||||
while True:
|
||||
data_read = file_to_stream.read(chunk_size)
|
||||
if not data_read:
|
||||
break
|
||||
yield data_read
|
||||
|
||||
|
||||
async def _internal_transfer_to_telegram(client: TelegramClient,
|
||||
response: BinaryIO,
|
||||
progress_callback: callable
|
||||
) -> Tuple[TypeInputFile, int]:
|
||||
file_id = helpers.generate_random_long()
|
||||
file_size = os.path.getsize(response.name)
|
||||
|
||||
hash_md5 = hashlib.md5()
|
||||
uploader = ParallelTransferrer(client)
|
||||
part_size, part_count, is_large = await uploader.init_upload(file_id, file_size)
|
||||
buffer = bytearray()
|
||||
for data in stream_file(response):
|
||||
if progress_callback:
|
||||
r = progress_callback(response.tell(), file_size)
|
||||
if inspect.isawaitable(r):
|
||||
await r
|
||||
if not is_large:
|
||||
hash_md5.update(data)
|
||||
if len(buffer) == 0 and len(data) == part_size:
|
||||
await uploader.upload(data)
|
||||
continue
|
||||
new_len = len(buffer) + len(data)
|
||||
if new_len >= part_size:
|
||||
cutoff = part_size - len(buffer)
|
||||
buffer.extend(data[:cutoff])
|
||||
await uploader.upload(bytes(buffer))
|
||||
buffer.clear()
|
||||
buffer.extend(data[cutoff:])
|
||||
else:
|
||||
buffer.extend(data)
|
||||
if len(buffer) > 0:
|
||||
await uploader.upload(bytes(buffer))
|
||||
await uploader.finish_upload()
|
||||
if is_large:
|
||||
return InputFileBig(file_id, part_count, "upload"), file_size
|
||||
else:
|
||||
return InputFile(file_id, part_count, "upload", hash_md5.hexdigest()), file_size
|
||||
|
||||
|
||||
async def download_file(client: TelegramClient,
|
||||
location: TypeLocation,
|
||||
out: BinaryIO,
|
||||
progress_callback: callable = None
|
||||
) -> BinaryIO:
|
||||
size = location.size
|
||||
dc_id, location = utils.get_input_location(location)
|
||||
# We lock the transfers because telegram has connection count limits
|
||||
downloader = ParallelTransferrer(client, dc_id)
|
||||
downloaded = downloader.download(location, size)
|
||||
async for x in downloaded:
|
||||
out.write(x)
|
||||
if progress_callback:
|
||||
r = progress_callback(out.tell(), size)
|
||||
if inspect.isawaitable(r):
|
||||
await r
|
||||
|
||||
return out
|
||||
|
||||
|
||||
async def upload_file(client: TelegramClient,
|
||||
file: BinaryIO,
|
||||
progress_callback: callable = None,
|
||||
|
||||
) -> TypeInputFile:
|
||||
res = (await _internal_transfer_to_telegram(client, file, progress_callback))[0]
|
||||
return res
|
19
dl_test.py
19
dl_test.py
@ -1,19 +0,0 @@
|
||||
#!/usr/local/bin/python3
|
||||
# coding: utf-8
|
||||
|
||||
# ytdl-bot - dl_test.py
|
||||
# 5/4/21 12:56
|
||||
#
|
||||
|
||||
__author__ = "Benny <benny.think@gmail.com>"
|
||||
|
||||
import youtube_dl
|
||||
|
||||
ydl_opts = {
|
||||
# %(title)s.%(ext)s
|
||||
'outtmpl': '/Users/benny/Downloads/abc/%(title)s.%(ext)s',
|
||||
|
||||
}
|
||||
|
||||
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
|
||||
ydl.download(['https://www.youtube.com/watch?v=BaW_jenozKc'])
|
122
downloader.py
Normal file
122
downloader.py
Normal file
@ -0,0 +1,122 @@
|
||||
#!/usr/local/bin/python3
|
||||
# coding: utf-8
|
||||
|
||||
# ytdlbot - downloader.py
|
||||
# 8/14/21 16:53
|
||||
#
|
||||
|
||||
__author__ = "Benny <benny.think@gmail.com>"
|
||||
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import subprocess
|
||||
import traceback
|
||||
|
||||
import fakeredis
|
||||
import filetype
|
||||
import youtube_dl
|
||||
from youtube_dl import DownloadError
|
||||
|
||||
r = fakeredis.FakeStrictRedis()
|
||||
EXPIRE = 5
|
||||
|
||||
|
||||
def sizeof_fmt(num: int, suffix='B'):
|
||||
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
|
||||
if abs(num) < 1024.0:
|
||||
return "%3.1f%s%s" % (num, unit, suffix)
|
||||
num /= 1024.0
|
||||
return "%.1f%s%s" % (num, 'Yi', suffix)
|
||||
|
||||
|
||||
def edit_text(bot_msg, text):
|
||||
key = bot_msg.message_id
|
||||
# if the key exists, we shouldn't send edit message
|
||||
if not r.exists(key):
|
||||
r.set(key, "ok", ex=EXPIRE)
|
||||
bot_msg.edit_text(text)
|
||||
|
||||
|
||||
def download_hook(d: dict, bot_msg):
|
||||
if d['status'] == 'downloading':
|
||||
downloaded = d.get("downloaded_bytes", 0)
|
||||
total = d.get("total_bytes") or d.get("total_bytes_estimate", 0)
|
||||
filesize = sizeof_fmt(total)
|
||||
if total > 2 * 1024 * 1024 * 1024:
|
||||
raise Exception("\n\nYour video is too large. %s will exceed Telegram's max limit 2GiB" % filesize)
|
||||
|
||||
percent = d.get("_percent_str", "N/A")
|
||||
speed = d.get("_speed_str", "N/A")
|
||||
text = f'[{filesize}]: Downloading {percent} - {downloaded}/{total} @ {speed}'
|
||||
edit_text(bot_msg, text)
|
||||
|
||||
|
||||
def upload_hook(current, total, bot_msg):
|
||||
filesize = sizeof_fmt(total)
|
||||
text = f'[{filesize}]: Uploading {round(current / total * 100, 2)}% - {current}/{total}'
|
||||
edit_text(bot_msg, text)
|
||||
|
||||
|
||||
def convert_to_mp4(resp: dict):
|
||||
default_type = ["video/x-flv"]
|
||||
if resp["status"]:
|
||||
mime = filetype.guess(resp["filepath"]).mime
|
||||
if mime in default_type:
|
||||
path = resp["filepath"]
|
||||
new_name = os.path.basename(path).split(".")[0] + ".mp4"
|
||||
new_file_path = os.path.join(os.path.dirname(path), new_name)
|
||||
cmd = "ffmpeg -i {} {}".format(path, new_file_path)
|
||||
logging.info("Detected %s, converting to mp4...", mime)
|
||||
subprocess.check_output(cmd.split())
|
||||
resp["filepath"] = new_file_path
|
||||
return resp
|
||||
|
||||
|
||||
def ytdl_download(url, tempdir, bm) -> dict:
|
||||
response = dict(status=None, error=None, filepath=None)
|
||||
logging.info("Downloading for %s", url)
|
||||
output = os.path.join(tempdir, '%(title)s.%(ext)s')
|
||||
ydl_opts = {
|
||||
'progress_hooks': [lambda d: download_hook(d, bm)],
|
||||
'outtmpl': output,
|
||||
'restrictfilenames': True,
|
||||
'quiet': True
|
||||
}
|
||||
formats = [
|
||||
"bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio",
|
||||
"bestvideo[vcodec^=avc]+bestaudio[acodec^=mp4a]/best[vcodec^=avc]/best",
|
||||
""
|
||||
]
|
||||
success, err = None, None
|
||||
for f in formats:
|
||||
if f:
|
||||
ydl_opts["format"] = f
|
||||
try:
|
||||
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
|
||||
ydl.download([url])
|
||||
success = True
|
||||
except DownloadError:
|
||||
err = traceback.format_exc()
|
||||
logging.error("Download failed for %s ", url)
|
||||
|
||||
if success:
|
||||
response["status"] = True
|
||||
response["filepath"] = os.path.join(tempdir, [i for i in os.listdir(tempdir)][0])
|
||||
break
|
||||
else:
|
||||
response["status"] = False
|
||||
response["error"] = err
|
||||
# convert format if necessary
|
||||
convert_to_mp4(response)
|
||||
return response
|
||||
|
||||
|
||||
def convert_flac(flac_name, tmp):
|
||||
flac_tmp = pathlib.Path(tmp.name).parent.joinpath(flac_name).as_posix()
|
||||
# ffmpeg -i input-video.avi -vn -acodec copy output-audio.m4a
|
||||
cmd = "ffmpeg -y -i {} -vn -acodec copy {}".format(tmp.name, flac_tmp)
|
||||
print(cmd)
|
||||
logging.info("converting to flac")
|
||||
subprocess.check_output(cmd.split())
|
||||
return flac_tmp
|
@ -1,9 +1,6 @@
|
||||
pyrogram==1.2.9
|
||||
tgcrypto==1.2.2
|
||||
youtube-dl==2021.6.6
|
||||
tgbot-ping
|
||||
cryptg
|
||||
fakeredis
|
||||
|
||||
telethon==1.21.1
|
||||
youtube-dl==2021.4.26
|
||||
hachoir==3.1.2
|
||||
|
||||
filetype
|
366
ytdl.py
366
ytdl.py
@ -1,320 +1,128 @@
|
||||
#!/usr/local/bin/python3
|
||||
# coding: utf-8
|
||||
|
||||
# ytdl-bot - bot.py
|
||||
# 5/3/21 18:31
|
||||
# ytdlbot - new.py
|
||||
# 8/14/21 14:37
|
||||
#
|
||||
|
||||
__author__ = "Benny <benny.think@gmail.com>"
|
||||
|
||||
import asyncio
|
||||
import datetime
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import platform
|
||||
|
||||
import re
|
||||
import subprocess
|
||||
import tempfile
|
||||
import threading
|
||||
import traceback
|
||||
import typing
|
||||
|
||||
import fakeredis
|
||||
import filetype
|
||||
import youtube_dl
|
||||
from hachoir.metadata import extractMetadata
|
||||
from hachoir.metadata.audio import FlacMetadata, MpegAudioMetadata
|
||||
from hachoir.metadata.video import MkvMetadata, MP4Metadata
|
||||
from hachoir.parser import createParser
|
||||
from telethon import Button, TelegramClient, events
|
||||
from telethon.tl.types import (DocumentAttributeAudio,
|
||||
DocumentAttributeFilename,
|
||||
DocumentAttributeVideo)
|
||||
from telethon.utils import get_input_media
|
||||
from pyrogram import Client, filters, types
|
||||
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
|
||||
from tgbot_ping import get_runtime
|
||||
from youtube_dl.utils import DownloadError
|
||||
|
||||
from FastTelethon import download_file, upload_file
|
||||
from downloader import convert_flac, upload_hook, ytdl_download
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(filename)s [%(levelname)s]: %(message)s')
|
||||
logging.getLogger('telethon').setLevel(logging.WARNING)
|
||||
|
||||
token = os.getenv("TOKEN") or "17Zg"
|
||||
app_id = int(os.getenv("APP_ID") or "922")
|
||||
app_hash = os.getenv("APP_HASH") or "490"
|
||||
|
||||
bot = TelegramClient('bot', app_id, app_hash,
|
||||
device_model=f"{platform.system()} {platform.node()}-{os.path.basename(__file__)}",
|
||||
system_version=platform.platform()).start(bot_token=token)
|
||||
|
||||
r = fakeredis.FakeStrictRedis()
|
||||
EXPIRE = 5
|
||||
api_id = int(os.getenv("APP_ID", 0))
|
||||
api_hash = os.getenv("APP_HASH")
|
||||
token = os.getenv("TOKEN")
|
||||
app = Client("ytdl", api_id, api_hash, bot_token=token, workers=20)
|
||||
|
||||
|
||||
def get_metadata(video_path):
|
||||
try:
|
||||
metadata = extractMetadata(createParser(video_path))
|
||||
if isinstance(metadata, MkvMetadata):
|
||||
return dict(
|
||||
duration=metadata.get('duration').seconds,
|
||||
w=metadata['video[1]'].get('width'),
|
||||
h=metadata['video[1]'].get('height')
|
||||
), metadata.get('mime_type')
|
||||
elif isinstance(metadata, FlacMetadata):
|
||||
return dict(
|
||||
duration=metadata.get('duration').seconds,
|
||||
), metadata.get('mime_type')
|
||||
else:
|
||||
return dict(
|
||||
duration=metadata.get('duration').seconds,
|
||||
w=metadata.get('width', 0),
|
||||
h=metadata.get('height', 0)
|
||||
), metadata.get('mime_type')
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
return dict(duration=0, w=0, h=0), 'application/octet-stream'
|
||||
|
||||
|
||||
def go(chat_id, message, msg):
|
||||
asyncio.run(sync_edit_message(chat_id, message, msg))
|
||||
|
||||
|
||||
def sizeof_fmt(num: int, suffix='B'):
|
||||
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
|
||||
if abs(num) < 1024.0:
|
||||
return "%3.1f%s%s" % (num, unit, suffix)
|
||||
num /= 1024.0
|
||||
return "%.1f%s%s" % (num, 'Yi', suffix)
|
||||
|
||||
|
||||
def progress_hook(d: dict, chat_id, message):
|
||||
if d['status'] == 'downloading':
|
||||
downloaded = d.get("downloaded_bytes", 0)
|
||||
total = d.get("total_bytes") or d.get("total_bytes_estimate", 0)
|
||||
filesize = sizeof_fmt(total)
|
||||
if total > 2 * 1024 * 1024 * 1024:
|
||||
raise Exception("\n\nYour video is too large. %s will exceed Telegram's max limit 2GiB" % filesize)
|
||||
|
||||
percent = d.get("_percent_str", "N/A")
|
||||
speed = d.get("_speed_str", "N/A")
|
||||
msg = f'[{filesize}]: Downloading {percent} - {downloaded}/{total} @ {speed}'
|
||||
threading.Thread(target=go, args=(chat_id, message, msg)).start()
|
||||
|
||||
|
||||
def run_in_executor(f):
|
||||
@functools.wraps(f)
|
||||
def inner(*args, **kwargs):
|
||||
loop = asyncio.get_running_loop()
|
||||
return loop.run_in_executor(None, lambda: f(*args, **kwargs))
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
@run_in_executor
|
||||
def ytdl_download(url, tempdir, chat_id, message) -> dict:
|
||||
response = dict(status=None, error=None, filepath=None)
|
||||
logging.info("Downloading for %s", url)
|
||||
output = os.path.join(tempdir, '%(title)s.%(ext)s')
|
||||
ydl_opts = {
|
||||
'progress_hooks': [lambda d: progress_hook(d, chat_id, message)],
|
||||
'outtmpl': output,
|
||||
'restrictfilenames': True,
|
||||
'quiet': True
|
||||
}
|
||||
formats = [
|
||||
"bestvideo[ext=mp4]+bestaudio[ext=m4a]/bestvideo+bestaudio",
|
||||
"bestvideo[vcodec^=avc]+bestaudio[acodec^=mp4a]/best[vcodec^=avc]/best",
|
||||
""
|
||||
]
|
||||
success, err = None, None
|
||||
for f in formats:
|
||||
if f:
|
||||
ydl_opts["format"] = f
|
||||
try:
|
||||
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
|
||||
ydl.download([url])
|
||||
success = True
|
||||
except DownloadError:
|
||||
err = traceback.format_exc()
|
||||
logging.error("Download failed for %s ", url)
|
||||
|
||||
if success:
|
||||
response["status"] = True
|
||||
response["filepath"] = os.path.join(tempdir, [i for i in os.listdir(tempdir)][0])
|
||||
break
|
||||
else:
|
||||
response["status"] = False
|
||||
response["error"] = err
|
||||
# convert format if necessary
|
||||
convert_to_mp4(response)
|
||||
return response
|
||||
|
||||
|
||||
def convert_to_mp4(resp: dict):
|
||||
default_type = ["video/x-flv"]
|
||||
if resp["status"]:
|
||||
mime = filetype.guess(resp["filepath"]).mime
|
||||
if mime in default_type:
|
||||
path = resp["filepath"]
|
||||
new_name = os.path.basename(path).split(".")[0] + ".mp4"
|
||||
new_file_path = os.path.join(os.path.dirname(path), new_name)
|
||||
cmd = "ffmpeg -i {} {}".format(path, new_file_path)
|
||||
logging.info("Detected %s, converting to mp4...", mime)
|
||||
subprocess.check_output(cmd.split())
|
||||
resp["filepath"] = new_file_path
|
||||
return resp
|
||||
|
||||
|
||||
async def upload_callback(current, total, chat_id, message):
|
||||
key = f"{chat_id}-{message.id}"
|
||||
# if the key exists, we shouldn't send edit message
|
||||
if not r.exists(key):
|
||||
r.set(key, "ok", ex=EXPIRE)
|
||||
filesize = sizeof_fmt(total)
|
||||
msg = f'[{filesize}]: Uploading {round(current / total * 100, 2)}% - {current}/{total}'
|
||||
await bot.edit_message(chat_id, message, msg)
|
||||
|
||||
|
||||
async def sync_edit_message(chat_id, message, msg):
|
||||
# try to avoid flood
|
||||
key = f"{chat_id}-{message.id}"
|
||||
if not r.exists(key):
|
||||
r.set(key, "ok", ex=EXPIRE)
|
||||
await bot.edit_message(chat_id, message, msg)
|
||||
|
||||
|
||||
# bot starts here
|
||||
@bot.on(events.NewMessage(pattern='/start'))
|
||||
async def send_start(event):
|
||||
@app.on_message(filters.command(["start"]))
|
||||
def start_handler(client: "Client", message: "types.Message"):
|
||||
chat_id = message.chat.id
|
||||
logging.info("Welcome to youtube-dl bot!")
|
||||
async with bot.action(event.chat_id, 'typing'):
|
||||
await bot.send_message(event.chat_id, "Wrapper for youtube-dl.")
|
||||
raise events.StopPropagation
|
||||
client.send_chat_action(chat_id, "typing")
|
||||
client.send_message(message.chat.id, "Wrapper for youtube-dl.")
|
||||
|
||||
|
||||
async def convert_flac(flac_name, tmp):
|
||||
flac_tmp = pathlib.Path(tmp.name).parent.joinpath(flac_name).as_posix()
|
||||
# ffmpeg -i input-video.avi -vn -acodec copy output-audio.m4a
|
||||
cmd = "ffmpeg -y -i {} -vn -acodec copy {}".format(tmp.name, flac_tmp)
|
||||
print(cmd)
|
||||
logging.info("converting to flac")
|
||||
subprocess.check_output(cmd.split())
|
||||
return flac_tmp
|
||||
@app.on_message(filters.command(["help"]))
|
||||
def help_handler(client: "Client", message: "types.Message"):
|
||||
chat_id = message.chat.id
|
||||
client.send_chat_action(chat_id, "typing")
|
||||
client.send_message(chat_id, "Stop working? "
|
||||
"Wait a few seconds, send your link again or report bugs at "
|
||||
"https://github.com/tgbot-collection/ytdl-bot/issues")
|
||||
|
||||
|
||||
@bot.on(events.CallbackQuery)
|
||||
async def handler(event):
|
||||
await event.answer('Converting to audio...please wait patiently')
|
||||
msg = await event.get_message()
|
||||
chat_id = msg.chat_id
|
||||
mp4_name = msg.file.name # 'youtube-dl_test_video_a.mp4'
|
||||
flac_name = mp4_name.replace("mp4", "m4a")
|
||||
|
||||
with tempfile.NamedTemporaryFile() as tmp:
|
||||
with open(tmp.name, "wb") as out:
|
||||
logging.info("downloading to %s", tmp.name)
|
||||
async with bot.action(chat_id, 'record-round'):
|
||||
await download_file(event.client, msg.media.document, out, )
|
||||
logging.info("downloading complete %s", tmp.name)
|
||||
# execute ffmpeg
|
||||
async with bot.action(chat_id, 'record-audio'):
|
||||
await asyncio.sleep(1)
|
||||
flac_tmp = await convert_flac(flac_name, tmp)
|
||||
async with bot.action(chat_id, 'document'):
|
||||
logging.info("Converting flac complete, sending...")
|
||||
# with open(flac_tmp, 'rb') as f:
|
||||
# input_file = await upload_file(bot, f)
|
||||
# metadata, mime_type = get_metadata(flac_tmp)
|
||||
# input_media = get_input_media(input_file)
|
||||
# input_media.attributes = [
|
||||
# DocumentAttributeAudio(duration=metadata["duration"]),
|
||||
# DocumentAttributeFilename(flac_name),
|
||||
# ]
|
||||
# input_media.mime_type = mime_type
|
||||
# await bot.send_file(chat_id, input_media)
|
||||
# TODO temp
|
||||
await bot.send_file(chat_id, flac_tmp)
|
||||
os.unlink(flac_tmp)
|
||||
tmp.close()
|
||||
@app.on_message(filters.command(["ping"]))
|
||||
def ping_handler(client: "Client", message: "types.Message"):
|
||||
chat_id = message.chat.id
|
||||
client.send_chat_action(chat_id, "typing")
|
||||
bot_info = get_runtime("botsrunner_ytdl_1", "YouTube-dl")
|
||||
client.send_message(chat_id, bot_info)
|
||||
|
||||
|
||||
@bot.on(events.NewMessage(pattern='/help'))
|
||||
async def send_help(event):
|
||||
async with bot.action(event.chat_id, 'typing'):
|
||||
await bot.send_message(event.chat_id, "Bot is not working? "
|
||||
"Wait a few seconds, send your link again or report bugs at "
|
||||
"https://github.com/tgbot-collection/ytdl-bot/issues")
|
||||
raise events.StopPropagation
|
||||
@app.on_message(filters.command(["about"]))
|
||||
def help_handler(client: "Client", message: "types.Message"):
|
||||
chat_id = message.chat.id
|
||||
client.send_chat_action(chat_id, "typing")
|
||||
client.send_message(chat_id, "YouTube-DL by @BennyThink\n"
|
||||
"GitHub: https://github.com/tgbot-collection/ytdl-bot")
|
||||
|
||||
|
||||
@bot.on(events.NewMessage(pattern='/ping'))
|
||||
async def send_ping(event):
|
||||
async with bot.action(event.chat_id, 'typing'):
|
||||
bot_info = get_runtime("botsrunner_ytdl_1", "YouTube-dl")
|
||||
await bot.send_message(event.chat_id, f"{bot_info}\n", parse_mode='md')
|
||||
raise events.StopPropagation
|
||||
|
||||
|
||||
@bot.on(events.NewMessage(pattern='/about'))
|
||||
async def send_about(event):
|
||||
async with bot.action(event.chat_id, 'typing'):
|
||||
await bot.send_message(event.chat_id, "YouTube-DL by @BennyThink\n"
|
||||
"GitHub: https://github.com/tgbot-collection/ytdl-bot")
|
||||
raise events.StopPropagation
|
||||
|
||||
|
||||
@bot.on(events.NewMessage(incoming=True))
|
||||
async def send_video(event):
|
||||
chat_id = event.message.chat_id
|
||||
url = re.sub(r'/ytdl\s*', '', event.message.text)
|
||||
@app.on_message()
|
||||
def download_handler(client: "Client", message: "types.Message"):
|
||||
chat_id = message.chat.id
|
||||
url = re.sub(r'/ytdl\s*', '', message.text)
|
||||
logging.info("start %s", url)
|
||||
# if this is in a group/channel
|
||||
if not event.message.is_private and not event.message.text.lower().startswith("/ytdl"):
|
||||
logging.warning("%s, it's annoying me...🙄️ ", event.message.text)
|
||||
return
|
||||
|
||||
if not re.findall(r"^https?://", url.lower()):
|
||||
await event.reply("I think you should send me a link. Don't you agree with me?")
|
||||
message.reply_text("I think you should send me a link.", quote=True)
|
||||
return
|
||||
|
||||
message = await event.reply("Processing...")
|
||||
bot_msg: typing.Union["types.Message", "typing.Any"] = message.reply_text("Processing", quote=True)
|
||||
client.send_chat_action(chat_id, 'upload_video')
|
||||
temp_dir = tempfile.TemporaryDirectory()
|
||||
|
||||
async with bot.action(chat_id, 'video'):
|
||||
result = await ytdl_download(url, temp_dir.name, chat_id, message)
|
||||
result = ytdl_download(url, temp_dir.name, bot_msg)
|
||||
logging.info("Download complete.")
|
||||
|
||||
# markup
|
||||
markup = bot.build_reply_markup(Button.inline('audio'))
|
||||
if result["status"]:
|
||||
async with bot.action(chat_id, 'document'):
|
||||
video_path = result["filepath"]
|
||||
await bot.edit_message(chat_id, message, 'Download complete. Sending now...')
|
||||
metadata, mime_type = get_metadata(video_path)
|
||||
with open(video_path, 'rb') as f:
|
||||
input_file = await upload_file(
|
||||
bot, f,
|
||||
progress_callback=lambda x, y: upload_callback(x, y, chat_id, message))
|
||||
input_media = get_input_media(input_file)
|
||||
file_name = os.path.basename(video_path)
|
||||
input_media.attributes = [
|
||||
DocumentAttributeVideo(round_message=False, supports_streaming=True, **metadata),
|
||||
DocumentAttributeFilename(file_name),
|
||||
markup = InlineKeyboardMarkup(
|
||||
[
|
||||
[ # First row
|
||||
InlineKeyboardButton( # Generates a callback query when pressed
|
||||
"audio",
|
||||
callback_data="audio"
|
||||
)
|
||||
]
|
||||
input_media.mime_type = mime_type
|
||||
# duration here is int - convert to timedelta
|
||||
metadata["duration_str"] = datetime.timedelta(seconds=metadata["duration"])
|
||||
metadata["size"] = sizeof_fmt(os.stat(video_path).st_size)
|
||||
caption = "{name}\n{duration_str} {size} {w}*{h}".format(name=file_name, **metadata)
|
||||
await bot.send_file(chat_id, input_media, caption=caption, buttons=markup)
|
||||
await bot.edit_message(chat_id, message, 'Download success!✅')
|
||||
]
|
||||
)
|
||||
|
||||
if result["status"]:
|
||||
client.send_chat_action(chat_id, 'upload_document')
|
||||
video_path = result["filepath"]
|
||||
bot_msg.edit_text('Download complete. Sending now...')
|
||||
client.send_video(chat_id, video_path, supports_streaming=True, caption=url,
|
||||
progress=upload_hook, progress_args=(bot_msg,), reply_markup=markup)
|
||||
bot_msg.edit_text('Download success!✅')
|
||||
else:
|
||||
async with bot.action(chat_id, 'typing'):
|
||||
tb = result["error"][0:4000]
|
||||
await bot.edit_message(chat_id, message, f"{url} download failed❌:\n```{tb}```",
|
||||
parse_mode='markdown')
|
||||
client.send_chat_action(chat_id, 'typing')
|
||||
tb = result["error"][0:4000]
|
||||
bot_msg.edit_text(f"{url} download failed❌:\n```{tb}```")
|
||||
|
||||
temp_dir.cleanup()
|
||||
|
||||
|
||||
@app.on_callback_query()
|
||||
def answer(client: "Client", callback_query: types.CallbackQuery):
|
||||
callback_query.answer(f"Converting to audio...please wait patiently")
|
||||
msg = callback_query.message
|
||||
|
||||
chat_id = msg.chat.id
|
||||
mp4_name = msg.video.file_name # 'youtube-dl_test_video_a.mp4'
|
||||
flac_name = mp4_name.replace("mp4", "m4a")
|
||||
|
||||
with tempfile.NamedTemporaryFile() as tmp:
|
||||
logging.info("downloading to %s", tmp.name)
|
||||
client.send_chat_action(chat_id, 'record_video_note')
|
||||
client.download_media(msg, tmp.name)
|
||||
logging.info("downloading complete %s", tmp.name)
|
||||
# execute ffmpeg
|
||||
client.send_chat_action(chat_id, 'record_audio')
|
||||
flac_tmp = convert_flac(flac_name, tmp)
|
||||
client.send_chat_action(chat_id, 'upload_audio')
|
||||
client.send_audio(chat_id, flac_tmp)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
bot.run_until_disconnected()
|
||||
app.run()
|
||||
|
Loading…
Reference in New Issue
Block a user