From 6bff673a36b503d240aec7228cfb56972705d2ac Mon Sep 17 00:00:00 2001 From: BennyThink Date: Wed, 29 Dec 2021 16:57:06 +0800 Subject: [PATCH] use celery --- .dockerignore | 2 + .gitignore | 3 + Dockerfile | 4 +- Procfile | 2 +- supervisor.conf => conf/supervisor.conf | 8 +- docker-compose.yml | 21 ++++- requirements.txt | 3 +- broadcast.py => ytdlbot/broadcast.py | 2 +- ytdlbot/client_init.py | 21 +++++ config.py => ytdlbot/config.py | 10 ++- constant.py => ytdlbot/constant.py | 0 db.py => ytdlbot/db.py | 0 downloader.py => ytdlbot/downloader.py | 0 limit.py => ytdlbot/limit.py | 2 +- ytdlbot/tasks.py | 108 ++++++++++++++++++++++++ utils.py => ytdlbot/utils.py | 18 ++++ ytdl.py => ytdlbot/ytdl_bot.py | 92 ++------------------ 17 files changed, 199 insertions(+), 97 deletions(-) create mode 100644 .dockerignore rename supervisor.conf => conf/supervisor.conf (65%) rename broadcast.py => ytdlbot/broadcast.py (97%) create mode 100644 ytdlbot/client_init.py rename config.py => ytdlbot/config.py (78%) rename constant.py => ytdlbot/constant.py (100%) rename db.py => ytdlbot/db.py (100%) rename downloader.py => ytdlbot/downloader.py (100%) rename limit.py => ytdlbot/limit.py (99%) create mode 100644 ytdlbot/tasks.py rename utils.py => ytdlbot/utils.py (77%) rename ytdl.py => ytdlbot/ytdl_bot.py (71%) diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..4057a7e --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +env +db_data \ No newline at end of file diff --git a/.gitignore b/.gitignore index 8127a19..a64a174 100644 --- a/.gitignore +++ b/.gitignore @@ -151,3 +151,6 @@ dmypy.json /.idea/dataSources/bf75f0a6-c774-4ecf-9448-2086f57b70df/entities/entities.dat_i /.idea/dataSources/bf75f0a6-c774-4ecf-9448-2086f57b70df/entities/entities.dat_i.len /.idea/dataSources/bf75f0a6-c774-4ecf-9448-2086f57b70df/storage_v2/_src_/schema/main.uQUzAA.meta +db_data/* +env/* +.ash_history \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 92ae4dd..8e6766a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,7 @@ RUN pip3 install --user -r /tmp/requirements.txt && rm /tmp/requirements.txt FROM python:3.9-alpine -WORKDIR /ytdlbot +WORKDIR /ytdlbot/ytdlbot ENV TZ=Asia/Shanghai RUN apk update && apk add --no-cache ffmpeg vnstat @@ -15,4 +15,4 @@ COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo COPY . /ytdlbot -CMD ["/usr/local/bin/supervisord", "-c" ,"/ytdlbot/supervisor.conf"] \ No newline at end of file +CMD ["/usr/local/bin/supervisord", "-c" ,"/ytdlbot/conf/supervisor.conf"] \ No newline at end of file diff --git a/Procfile b/Procfile index c51cd2f..20b6b4b 100644 --- a/Procfile +++ b/Procfile @@ -1 +1 @@ -worker: python ytdl.py \ No newline at end of file +worker: python ytdl_bot.py \ No newline at end of file diff --git a/supervisor.conf b/conf/supervisor.conf similarity index 65% rename from supervisor.conf rename to conf/supervisor.conf index 744f5ce..b45022e 100644 --- a/supervisor.conf +++ b/conf/supervisor.conf @@ -9,10 +9,10 @@ command=vnstatd -n autorestart=true [program:ytdl] -directory=/ytdlbot -command=python ytdl.py +directory=/ytdlbot/ytdlbot/ +command=python ytdl_bot.py autorestart=true +redirect_stderr=true stdout_logfile=/dev/fd/1 -stdout_logfile_maxbytes=0 -redirect_stderr=true \ No newline at end of file +stdout_logfile_maxbytes=0 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 66e1aed..4eb317b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,13 +14,28 @@ services: logging: driver: none + mysql: + image: mysql:5.7 + restart: always + volumes: + - ./db_data:/var/lib/mysql + environment: + MYSQL_ROOT_PASSWORD: 'root' + logging: + driver: none + ytdl: image: bennythink/ytdlbot env_file: - env/ytdl.env - volumes: - - ./data/vip.sqlite:/ytdlbot/vip.sqlite restart: always depends_on: - socat - - redis \ No newline at end of file + - redis + + worker: + image: bennythink/ytdlbot + env_file: + - env/ytdl.env + restart: always + command: [ "/usr/local/bin/celery" ,"-A","tasks","worker","--loglevel=info","--pool=prefork","--concurrency=200" ] diff --git a/requirements.txt b/requirements.txt index 050c661..bc6d28e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -pyrogram==1.2.11 +pyrogram==1.2.20 tgcrypto==1.2.2 yt-dlp==2021.12.1 youtube-dl==2021.6.6 @@ -6,6 +6,7 @@ APScheduler==3.7.0 beautifultable==1.0.1 ffmpeg-python==0.2.0 PyMySQL==1.0.2 +celery==5.2.2 supervisor tgbot-ping diff --git a/broadcast.py b/ytdlbot/broadcast.py similarity index 97% rename from broadcast.py rename to ytdlbot/broadcast.py index e2bf1f8..91d9a4b 100644 --- a/broadcast.py +++ b/ytdlbot/broadcast.py @@ -18,7 +18,7 @@ import time from tqdm import tqdm from db import Redis -from ytdl import create_app +from ytdl_bot import create_app parser = argparse.ArgumentParser(description='Broadcast to users') parser.add_argument('-m', help='message', required=True) diff --git a/ytdlbot/client_init.py b/ytdlbot/client_init.py new file mode 100644 index 0000000..5972514 --- /dev/null +++ b/ytdlbot/client_init.py @@ -0,0 +1,21 @@ +#!/usr/local/bin/python3 +# coding: utf-8 + +# ytdlbot - client_init.py +# 12/29/21 16:20 +# + +__author__ = "Benny " + +from pyrogram import Client + +from config import APP_HASH, APP_ID, TOKEN, WORKERS + + +def create_app(session="ytdl", workers=WORKERS): + _app = Client(session, APP_ID, APP_HASH, + bot_token=TOKEN, workers=workers, + # proxy={"hostname": "host.docker.internal", "port": 1086} + ) + + return _app diff --git a/config.py b/ytdlbot/config.py similarity index 78% rename from config.py rename to ytdlbot/config.py index 3bd2e00..25ca9ba 100644 --- a/config.py +++ b/ytdlbot/config.py @@ -10,10 +10,11 @@ __author__ = "Benny " import os # general settings -WORKERS: "int" = int(os.getenv("WORKERS", 500)) +WORKERS: "int" = int(os.getenv("WORKERS", 200)) APP_ID: "int" = int(os.getenv("APP_ID", 111)) APP_HASH = os.getenv("APP_HASH", "111") TOKEN = os.getenv("TOKEN", "3703WLI") + REDIS = os.getenv("REDIS") # quota settings @@ -37,3 +38,10 @@ OWNER = os.getenv("OWNER", "BennyThink") AUTHORIZED_USER: "str" = os.getenv("AUTHORIZED", "") # membership requires: the format could be username/chat_id of channel or group REQUIRED_MEMBERSHIP: "str" = os.getenv("REQUIRED_MEMBERSHIP", "") + +# celery related +ENABLE_CELERY = os.getenv("ENABLE_CELERY", False) +BROKER = os.getenv("BROKER", f"redis://{REDIS}:6379/4") +MYSQL_HOST = os.getenv("MYSQL_HOST", "localhost") +MYSQL_USER = os.getenv("MYSQL_USER", "root") +MYSQL_PASS = os.getenv("MYSQL_PASS", "root") diff --git a/constant.py b/ytdlbot/constant.py similarity index 100% rename from constant.py rename to ytdlbot/constant.py diff --git a/db.py b/ytdlbot/db.py similarity index 100% rename from db.py rename to ytdlbot/db.py diff --git a/downloader.py b/ytdlbot/downloader.py similarity index 100% rename from downloader.py rename to ytdlbot/downloader.py diff --git a/limit.py b/ytdlbot/limit.py similarity index 99% rename from limit.py rename to ytdlbot/limit.py index 2d59a4a..85f4744 100644 --- a/limit.py +++ b/ytdlbot/limit.py @@ -24,7 +24,7 @@ apply_log_formatter() def get_username(chat_id): - from ytdl import create_app + from ytdl_bot import create_app with tempfile.NamedTemporaryFile() as tmp: with create_app(tmp.name, 1) as app: data = app.get_chat(chat_id).first_name diff --git a/ytdlbot/tasks.py b/ytdlbot/tasks.py new file mode 100644 index 0000000..ec3f6d6 --- /dev/null +++ b/ytdlbot/tasks.py @@ -0,0 +1,108 @@ +#!/usr/local/bin/python3 +# coding: utf-8 + +# ytdlbot - tasks.py +# 12/29/21 14:57 +# + +__author__ = "Benny " + +import logging +import os +import pathlib +import tempfile +import time + +from celery import Celery +from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup + +from client_init import create_app +from config import BROKER, ENABLE_CELERY +from constant import BotText +from db import Redis +from downloader import sizeof_fmt, upload_hook, ytdl_download +from utils import get_metadata, get_user_settings + +bot_text = BotText() + +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(filename)s [%(levelname)s]: %(message)s') +# celery -A tasks worker --loglevel=info --pool=solo + +# app = Celery('celery', broker=BROKER, accept_content=['pickle'], task_serializer='pickle') +app = Celery('celery', broker=BROKER) + +celery_client = create_app(app.main, 5) + + +@app.task() +def download_task(chat_id, message_id, url): + logging.info("celery tasks started for %s", url) + with celery_client: + bot_msg = celery_client.get_messages(chat_id, message_id) + normal_download(bot_msg, celery_client, url) + logging.info("celery tasks ended.") + + +def download_entrance(bot_msg, client, url): + if ENABLE_CELERY: + download_task.delay(bot_msg.chat.id, bot_msg.message_id, url) + else: + normal_download(bot_msg, client, url) + + +def normal_download(bot_msg, client, url): + chat_id = bot_msg.chat.id + temp_dir = tempfile.TemporaryDirectory() + + result = ytdl_download(url, temp_dir.name, bot_msg) + logging.info("Download complete.") + markup = InlineKeyboardMarkup( + [ + [ # First row + InlineKeyboardButton( # Generates a callback query when pressed + "audio", + callback_data="audio" + ) + ] + ] + ) + if result["status"]: + client.send_chat_action(chat_id, 'upload_document') + video_paths = result["filepath"] + bot_msg.edit_text('Download complete. Sending now...') + for video_path in video_paths: + filename = pathlib.Path(video_path).name + remain = bot_text.remaining_quota_caption(chat_id) + size = sizeof_fmt(os.stat(video_path).st_size) + meta = get_metadata(video_path) + cap = f"`{filename}`\n\n{url}\n\nInfo: {meta['width']}x{meta['height']} {size}\n\n{remain}" + settings = get_user_settings(str(chat_id)) + if settings[2] == "document": + logging.info("Sending as document") + client.send_document(chat_id, video_path, + caption=cap, + progress=upload_hook, progress_args=(bot_msg,), + reply_markup=markup, + thumb=meta["thumb"] + ) + else: + logging.info("Sending as video") + client.send_video(chat_id, video_path, + supports_streaming=True, + caption=cap, + progress=upload_hook, progress_args=(bot_msg,), + reply_markup=markup, + **meta + ) + Redis().update_metrics("video_success") + bot_msg.edit_text('Download success!✅') + else: + client.send_chat_action(chat_id, 'typing') + tb = result["error"][0:4000] + bot_msg.edit_text(f"Download failed!❌\n\n```{tb}```", disable_web_page_preview=True) + + temp_dir.cleanup() + + +if __name__ == '__main__': + download_task("", "", "") diff --git a/utils.py b/ytdlbot/utils.py similarity index 77% rename from utils.py rename to ytdlbot/utils.py index 8f7446a..2868e36 100644 --- a/utils.py +++ b/ytdlbot/utils.py @@ -9,6 +9,8 @@ __author__ = "Benny " import logging +import ffmpeg + from db import MySQL @@ -70,3 +72,19 @@ def adjust_formats(user_id: "str", url: "str", formats: "list"): for m in mapping.get(settings[1], []): formats.insert(0, f"bestvideo[ext=mp4][height={m}]+bestaudio[ext=m4a]") formats.insert(1, f"bestvideo[vcodec^=avc][height={m}]+bestaudio[acodec^=mp4a]/best[vcodec^=avc]/best") + + +def get_metadata(video_path): + width, height, duration = 1280, 720, 0 + try: + video_streams = ffmpeg.probe(video_path, select_streams="v") + for item in video_streams.get("streams", []): + height = item["height"] + width = item["width"] + duration = int(float(video_streams["format"]["duration"])) + except Exception as e: + logging.error(e) + + thumb = video_path + "-thunmnail.png" + ffmpeg.input(video_path, ss=duration / 2).filter('scale', width, -1).output(thumb, vframes=1).run() + return dict(height=height, width=width, duration=duration, thumb=thumb) \ No newline at end of file diff --git a/ytdl.py b/ytdlbot/ytdl_bot.py similarity index 71% rename from ytdl.py rename to ytdlbot/ytdl_bot.py index e919e9a..b465c2b 100644 --- a/ytdl.py +++ b/ytdlbot/ytdl_bot.py @@ -9,36 +9,26 @@ __author__ = "Benny " import logging import os -import pathlib import re import tempfile import typing -import ffmpeg from apscheduler.schedulers.background import BackgroundScheduler from pyrogram import Client, filters, types from pyrogram.errors.exceptions.bad_request_400 import UserNotParticipant from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup from tgbot_ping import get_runtime -from config import (APP_HASH, APP_ID, AUTHORIZED_USER, ENABLE_VIP, OWNER, - REQUIRED_MEMBERSHIP, TOKEN, WORKERS) +from client_init import create_app +from config import (AUTHORIZED_USER, ENABLE_CELERY, ENABLE_VIP, OWNER, + REQUIRED_MEMBERSHIP) from constant import BotText from db import MySQL, Redis -from downloader import convert_flac, sizeof_fmt, upload_hook, ytdl_download +from downloader import convert_flac from limit import verify_payment +from tasks import download_entrance from utils import customize_logger, get_user_settings, set_user_settings - -def create_app(session="ytdl", workers=WORKERS): - _app = Client(session, APP_ID, APP_HASH, - bot_token=TOKEN, workers=workers, - proxy={'hostname': '127.0.0.1', 'port': 1086} - ) - - return _app - - customize_logger(["pyrogram.client", "pyrogram.session.session", "pyrogram.client", "pyrogram.connection.connection"]) app = create_app() bot_text = BotText() @@ -46,22 +36,6 @@ bot_text = BotText() logging.info("Authorized users are %s", AUTHORIZED_USER) -def get_metadata(video_path): - width, height, duration = 1280, 720, 0 - try: - video_streams = ffmpeg.probe(video_path, select_streams="v") - for item in video_streams.get("streams", []): - height = item["height"] - width = item["width"] - duration = int(float(video_streams["format"]["duration"])) - except Exception as e: - logging.error(e) - - thumb = video_path + "-thunmnail.png" - ffmpeg.input(video_path, ss=duration / 2).filter('scale', width, -1).output(thumb, vframes=1).run() - return dict(height=height, width=width, duration=duration, thumb=thumb) - - def private_use(func): def wrapper(client: "Client", message: "types.Message"): chat_id = getattr(message.from_user, "id", None) @@ -194,58 +168,10 @@ def download_handler(client: "Client", message: "types.Message"): Redis().update_metrics("video_request") bot_msg: typing.Union["types.Message", "typing.Any"] = message.reply_text("Processing", quote=True) client.send_chat_action(chat_id, 'upload_video') - temp_dir = tempfile.TemporaryDirectory() + # temp_dir = tempfile.TemporaryDirectory() + download_entrance(bot_msg, client, url) - result = ytdl_download(url, temp_dir.name, bot_msg) - logging.info("Download complete.") - - markup = InlineKeyboardMarkup( - [ - [ # First row - InlineKeyboardButton( # Generates a callback query when pressed - "audio", - callback_data="audio" - ) - ] - ] - ) - - if result["status"]: - client.send_chat_action(chat_id, 'upload_document') - video_paths = result["filepath"] - bot_msg.edit_text('Download complete. Sending now...') - for video_path in video_paths: - filename = pathlib.Path(video_path).name - remain = bot_text.remaining_quota_caption(chat_id) - size = sizeof_fmt(os.stat(video_path).st_size) - meta = get_metadata(video_path) - cap = f"`{filename}`\n\n{url}\n\nInfo: {meta['width']}x{meta['height']} {size}\n\n{remain}" - settings = get_user_settings(str(chat_id)) - if settings[2] == "document": - logging.info("Sending as document") - client.send_document(chat_id, video_path, - caption=cap, - progress=upload_hook, progress_args=(bot_msg,), - reply_markup=markup, - thumb=meta["thumb"] - ) - else: - logging.info("Sending as video") - client.send_video(chat_id, video_path, - supports_streaming=True, - caption=cap, - progress=upload_hook, progress_args=(bot_msg,), - reply_markup=markup, - **meta - ) - Redis().update_metrics("video_success") - bot_msg.edit_text('Download success!✅') - else: - client.send_chat_action(chat_id, 'typing') - tb = result["error"][0:4000] - bot_msg.edit_text(f"Download failed!❌\n\n```{tb}```", disable_web_page_preview=True) - - temp_dir.cleanup() + # temp_dir.cleanup() @app.on_callback_query(filters.regex(r"document|video")) @@ -302,7 +228,7 @@ if __name__ == '__main__': ▌ ▌ ▌ ▌ ▌ ▌ ▌ ▌ ▌ ▌ ▛▀ ▌ ▌ ▌ ▌ ▐▐▐ ▌ ▌ ▐ ▌ ▌ ▞▀▌ ▌ ▌ ▘ ▝▀ ▝▀▘ ▘ ▝▀▘ ▀▀ ▝▀▘ ▀▀ ▝▀ ▘▘ ▘ ▘ ▘ ▝▀ ▝▀▘ ▝▀▘ -By @BennyThink, VIP mode: {ENABLE_VIP} +By @BennyThink, VIP mode: {ENABLE_VIP}, Distribution: {ENABLE_CELERY} """ print(banner) app.run()