add queue support

This commit is contained in:
BennyThink 2022-07-10 21:25:24 +08:00
parent 903dd58e1b
commit 11a7175672
No known key found for this signature in database
GPG Key ID: 6CD0DBDA5235D481
4 changed files with 55 additions and 15 deletions

View File

@ -1,13 +1,13 @@
pyrogram==1.4.16 pyrogram==1.4.16
tgcrypto==1.2.3 tgcrypto==1.2.3
yt-dlp==2022.6.22.1 yt-dlp==2022.6.29
APScheduler==3.9.1 APScheduler==3.9.1
beautifultable==1.1.0 beautifultable==1.1.0
ffmpeg-python==0.2.0 ffmpeg-python==0.2.0
PyMySQL==1.0.2 PyMySQL==1.0.2
celery==5.2.7 celery==5.2.7
filetype==1.0.13 filetype==1.0.13
flower==1.0.0 flower==1.1.0
psutil==5.9.1 psutil==5.9.1
influxdb==5.3.1 influxdb==5.3.1
beautifulsoup4==4.11.1 beautifulsoup4==4.11.1
@ -15,8 +15,8 @@ fakeredis==1.8.1
supervisor==4.2.4 supervisor==4.2.4
tgbot-ping==1.0.4 tgbot-ping==1.0.4
redis==4.3.3 redis==4.3.3
requests==2.28.0 requests==2.28.1
tqdm==4.64.0 tqdm==4.64.0
requests-toolbelt==0.9.1 requests-toolbelt==0.9.1
ffpb==0.4.1 ffpb==0.4.1
youtube-search-python==1.6.6 youtube-search-python==1.6.6

View File

@ -46,7 +46,9 @@ REQUIRED_MEMBERSHIP: "str" = os.getenv("REQUIRED_MEMBERSHIP", "")
# celery related # celery related
ENABLE_CELERY = os.getenv("ENABLE_CELERY", False) ENABLE_CELERY = os.getenv("ENABLE_CELERY", False)
ENABLE_QUEUE = os.getenv("ENABLE_QUEUE", True)
BROKER = os.getenv("BROKER", f"redis://{REDIS}:6379/4") BROKER = os.getenv("BROKER", f"redis://{REDIS}:6379/4")
MYSQL_HOST = os.getenv("MYSQL_HOST") MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_USER = os.getenv("MYSQL_USER", "root") MYSQL_USER = os.getenv("MYSQL_USER", "root")
MYSQL_PASS = os.getenv("MYSQL_PASS", "root") MYSQL_PASS = os.getenv("MYSQL_PASS", "root")

View File

@ -10,6 +10,7 @@ __author__ = "Benny <benny.think@gmail.com>"
import logging import logging
import os import os
import pathlib import pathlib
import random
import re import re
import subprocess import subprocess
import tempfile import tempfile
@ -31,7 +32,7 @@ from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
from client_init import create_app from client_init import create_app
from config import (ARCHIVE_ID, AUDIO_FORMAT, BROKER, ENABLE_CELERY, from config import (ARCHIVE_ID, AUDIO_FORMAT, BROKER, ENABLE_CELERY,
ENABLE_VIP, TG_MAX_SIZE, WORKERS) ENABLE_QUEUE, ENABLE_VIP, TG_MAX_SIZE, WORKERS)
from constant import BotText from constant import BotText
from db import Redis from db import Redis
from downloader import (edit_text, run_ffmpeg, sizeof_fmt, tqdm_progress, from downloader import (edit_text, run_ffmpeg, sizeof_fmt, tqdm_progress,
@ -135,7 +136,8 @@ def ytdl_download_entrance(bot_msg, client, url):
return return
mode = get_user_settings(str(chat_id))[-1] mode = get_user_settings(str(chat_id))[-1]
if ENABLE_CELERY and mode in [None, "Celery"]: if ENABLE_CELERY and mode in [None, "Celery"]:
ytdl_download_task.delay(chat_id, bot_msg.message_id, url) async_task(ytdl_download_task, chat_id, bot_msg.message_id, url)
# ytdl_download_task.delay(chat_id, bot_msg.message_id, url)
else: else:
ytdl_normal_download(bot_msg, client, url) ytdl_normal_download(bot_msg, client, url)
@ -151,7 +153,8 @@ def direct_download_entrance(bot_msg, client, url):
def audio_entrance(bot_msg, client): def audio_entrance(bot_msg, client):
if ENABLE_CELERY: if ENABLE_CELERY:
audio_task.delay(bot_msg.chat.id, bot_msg.message_id) async_task(audio_task, bot_msg.chat.id, bot_msg.message_id)
# audio_task.delay(bot_msg.chat.id, bot_msg.message_id)
else: else:
normal_audio(bot_msg, client) normal_audio(bot_msg, client)
@ -284,12 +287,24 @@ def upload_processor(client, bot_msg, url, vp_or_fid: "typing.Any[str, pathlib.P
chat_id = ARCHIVE_ID chat_id = ARCHIVE_ID
if settings[2] == "document": if settings[2] == "document":
logging.info("Sending as document") logging.info("Sending as document")
res_msg = client.send_document(chat_id, vp_or_fid, try:
caption=cap, # send as document could be sent as video even if it's a document
progress=upload_hook, progress_args=(bot_msg,), res_msg = client.send_document(chat_id, vp_or_fid,
reply_markup=markup, caption=cap,
thumb=meta["thumb"] progress=upload_hook, progress_args=(bot_msg,),
) reply_markup=markup,
thumb=meta["thumb"],
force_document=True
)
except ValueError:
logging.error("Retry to send as video")
res_msg = client.send_video(chat_id, vp_or_fid,
supports_streaming=True,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
**meta
)
elif settings[2] == "audio": elif settings[2] == "audio":
logging.info("Sending as audio") logging.info("Sending as audio")
res_msg = client.send_audio(chat_id, vp_or_fid, res_msg = client.send_audio(chat_id, vp_or_fid,
@ -305,6 +320,7 @@ def upload_processor(client, bot_msg, url, vp_or_fid: "typing.Any[str, pathlib.P
reply_markup=markup, reply_markup=markup,
**meta **meta
) )
unique = get_unique_clink(url, bot_msg.chat.id) unique = get_unique_clink(url, bot_msg.chat.id)
obj = res_msg.document or res_msg.video or res_msg.audio obj = res_msg.document or res_msg.video or res_msg.audio
red.add_send_cache(unique, getattr(obj, "file_id", None)) red.add_send_cache(unique, getattr(obj, "file_id", None))
@ -387,12 +403,33 @@ def hot_patch(*args):
psutil.Process().kill() psutil.Process().kill()
def async_task(task_name, *args):
if not ENABLE_QUEUE:
task_name.delay(*args)
return
t0 = time.time()
inspect = app.control.inspect()
worker_stats = inspect.stats()
route_queues = []
for worker_name, stats in worker_stats.items():
route = worker_name.split('@')[1]
concurrency = stats['pool']['max-concurrency']
route_queues.extend([route] * concurrency)
destination = random.choice(route_queues)
logging.info("Selecting worker %s from %s in %.2fs", destination, route_queues, time.time() - t0)
task_name.apply_async(args=args, queue=destination)
def run_celery(): def run_celery():
worker_name = os.getenv("WORKER_NAME", "")
argv = [ argv = [
"-A", "tasks", 'worker', '--loglevel=info', "-A", "tasks", 'worker', '--loglevel=info',
"--pool=threads", f"--concurrency={WORKERS}", "--pool=threads", f"--concurrency={WORKERS}",
"-n", os.getenv("WORKER_NAME", "") "-n", worker_name
] ]
if ENABLE_QUEUE:
argv.extend(["-Q", worker_name])
app.worker_main(argv) app.worker_main(argv)

View File

@ -15,13 +15,14 @@ import time
import traceback import traceback
import typing import typing
from io import BytesIO from io import BytesIO
from youtubesearchpython import VideosSearch
import pyrogram.errors import pyrogram.errors
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from pyrogram import Client, filters, types from pyrogram import Client, filters, types
from pyrogram.errors.exceptions.bad_request_400 import UserNotParticipant from pyrogram.errors.exceptions.bad_request_400 import UserNotParticipant
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from tgbot_ping import get_runtime from tgbot_ping import get_runtime
from youtubesearchpython import VideosSearch
from client_init import create_app from client_init import create_app
from config import (AUTHORIZED_USER, ENABLE_CELERY, ENABLE_VIP, OWNER, from config import (AUTHORIZED_USER, ENABLE_CELERY, ENABLE_VIP, OWNER,