diff --git a/requirements.txt b/requirements.txt index c750248..c46a145 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,13 @@ pyrogram==1.4.16 tgcrypto==1.2.3 -yt-dlp==2022.6.22.1 +yt-dlp==2022.6.29 APScheduler==3.9.1 beautifultable==1.1.0 ffmpeg-python==0.2.0 PyMySQL==1.0.2 celery==5.2.7 filetype==1.0.13 -flower==1.0.0 +flower==1.1.0 psutil==5.9.1 influxdb==5.3.1 beautifulsoup4==4.11.1 @@ -15,8 +15,8 @@ fakeredis==1.8.1 supervisor==4.2.4 tgbot-ping==1.0.4 redis==4.3.3 -requests==2.28.0 +requests==2.28.1 tqdm==4.64.0 requests-toolbelt==0.9.1 ffpb==0.4.1 -youtube-search-python==1.6.6 \ No newline at end of file +youtube-search-python==1.6.6 diff --git a/ytdlbot/config.py b/ytdlbot/config.py index fe82fa9..e8d84ad 100644 --- a/ytdlbot/config.py +++ b/ytdlbot/config.py @@ -46,7 +46,9 @@ REQUIRED_MEMBERSHIP: "str" = os.getenv("REQUIRED_MEMBERSHIP", "") # celery related ENABLE_CELERY = os.getenv("ENABLE_CELERY", False) +ENABLE_QUEUE = os.getenv("ENABLE_QUEUE", True) BROKER = os.getenv("BROKER", f"redis://{REDIS}:6379/4") + MYSQL_HOST = os.getenv("MYSQL_HOST") MYSQL_USER = os.getenv("MYSQL_USER", "root") MYSQL_PASS = os.getenv("MYSQL_PASS", "root") diff --git a/ytdlbot/tasks.py b/ytdlbot/tasks.py index 3395e58..b5f1e55 100644 --- a/ytdlbot/tasks.py +++ b/ytdlbot/tasks.py @@ -10,6 +10,7 @@ __author__ = "Benny " import logging import os import pathlib +import random import re import subprocess import tempfile @@ -31,7 +32,7 @@ from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor from client_init import create_app from config import (ARCHIVE_ID, AUDIO_FORMAT, BROKER, ENABLE_CELERY, - ENABLE_VIP, TG_MAX_SIZE, WORKERS) + ENABLE_QUEUE, ENABLE_VIP, TG_MAX_SIZE, WORKERS) from constant import BotText from db import Redis from downloader import (edit_text, run_ffmpeg, sizeof_fmt, tqdm_progress, @@ -135,7 +136,8 @@ def ytdl_download_entrance(bot_msg, client, url): return mode = get_user_settings(str(chat_id))[-1] if ENABLE_CELERY and mode in [None, "Celery"]: - ytdl_download_task.delay(chat_id, bot_msg.message_id, url) + async_task(ytdl_download_task, chat_id, bot_msg.message_id, url) + # ytdl_download_task.delay(chat_id, bot_msg.message_id, url) else: ytdl_normal_download(bot_msg, client, url) @@ -151,7 +153,8 @@ def direct_download_entrance(bot_msg, client, url): def audio_entrance(bot_msg, client): if ENABLE_CELERY: - audio_task.delay(bot_msg.chat.id, bot_msg.message_id) + async_task(audio_task, bot_msg.chat.id, bot_msg.message_id) + # audio_task.delay(bot_msg.chat.id, bot_msg.message_id) else: normal_audio(bot_msg, client) @@ -284,12 +287,24 @@ def upload_processor(client, bot_msg, url, vp_or_fid: "typing.Any[str, pathlib.P chat_id = ARCHIVE_ID if settings[2] == "document": logging.info("Sending as document") - res_msg = client.send_document(chat_id, vp_or_fid, - caption=cap, - progress=upload_hook, progress_args=(bot_msg,), - reply_markup=markup, - thumb=meta["thumb"] - ) + try: + # send as document could be sent as video even if it's a document + res_msg = client.send_document(chat_id, vp_or_fid, + caption=cap, + progress=upload_hook, progress_args=(bot_msg,), + reply_markup=markup, + thumb=meta["thumb"], + force_document=True + ) + except ValueError: + logging.error("Retry to send as video") + res_msg = client.send_video(chat_id, vp_or_fid, + supports_streaming=True, + caption=cap, + progress=upload_hook, progress_args=(bot_msg,), + reply_markup=markup, + **meta + ) elif settings[2] == "audio": logging.info("Sending as audio") res_msg = client.send_audio(chat_id, vp_or_fid, @@ -305,6 +320,7 @@ def upload_processor(client, bot_msg, url, vp_or_fid: "typing.Any[str, pathlib.P reply_markup=markup, **meta ) + unique = get_unique_clink(url, bot_msg.chat.id) obj = res_msg.document or res_msg.video or res_msg.audio red.add_send_cache(unique, getattr(obj, "file_id", None)) @@ -387,12 +403,33 @@ def hot_patch(*args): psutil.Process().kill() +def async_task(task_name, *args): + if not ENABLE_QUEUE: + task_name.delay(*args) + return + + t0 = time.time() + inspect = app.control.inspect() + worker_stats = inspect.stats() + route_queues = [] + for worker_name, stats in worker_stats.items(): + route = worker_name.split('@')[1] + concurrency = stats['pool']['max-concurrency'] + route_queues.extend([route] * concurrency) + destination = random.choice(route_queues) + logging.info("Selecting worker %s from %s in %.2fs", destination, route_queues, time.time() - t0) + task_name.apply_async(args=args, queue=destination) + + def run_celery(): + worker_name = os.getenv("WORKER_NAME", "") argv = [ "-A", "tasks", 'worker', '--loglevel=info', "--pool=threads", f"--concurrency={WORKERS}", - "-n", os.getenv("WORKER_NAME", "") + "-n", worker_name ] + if ENABLE_QUEUE: + argv.extend(["-Q", worker_name]) app.worker_main(argv) diff --git a/ytdlbot/ytdl_bot.py b/ytdlbot/ytdl_bot.py index 5797687..9620e67 100644 --- a/ytdlbot/ytdl_bot.py +++ b/ytdlbot/ytdl_bot.py @@ -15,13 +15,14 @@ import time import traceback import typing from io import BytesIO -from youtubesearchpython import VideosSearch + import pyrogram.errors from apscheduler.schedulers.background import BackgroundScheduler from pyrogram import Client, filters, types from pyrogram.errors.exceptions.bad_request_400 import UserNotParticipant from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup from tgbot_ping import get_runtime +from youtubesearchpython import VideosSearch from client_init import create_app from config import (AUTHORIZED_USER, ENABLE_CELERY, ENABLE_VIP, OWNER,