use celery

This commit is contained in:
BennyThink 2021-12-29 16:57:06 +08:00
parent c755f392de
commit 6bff673a36
No known key found for this signature in database
GPG Key ID: 6CD0DBDA5235D481
17 changed files with 199 additions and 97 deletions

2
.dockerignore Normal file
View File

@ -0,0 +1,2 @@
env
db_data

3
.gitignore vendored
View File

@ -151,3 +151,6 @@ dmypy.json
/.idea/dataSources/bf75f0a6-c774-4ecf-9448-2086f57b70df/entities/entities.dat_i /.idea/dataSources/bf75f0a6-c774-4ecf-9448-2086f57b70df/entities/entities.dat_i
/.idea/dataSources/bf75f0a6-c774-4ecf-9448-2086f57b70df/entities/entities.dat_i.len /.idea/dataSources/bf75f0a6-c774-4ecf-9448-2086f57b70df/entities/entities.dat_i.len
/.idea/dataSources/bf75f0a6-c774-4ecf-9448-2086f57b70df/storage_v2/_src_/schema/main.uQUzAA.meta /.idea/dataSources/bf75f0a6-c774-4ecf-9448-2086f57b70df/storage_v2/_src_/schema/main.uQUzAA.meta
db_data/*
env/*
.ash_history

View File

@ -6,7 +6,7 @@ RUN pip3 install --user -r /tmp/requirements.txt && rm /tmp/requirements.txt
FROM python:3.9-alpine FROM python:3.9-alpine
WORKDIR /ytdlbot WORKDIR /ytdlbot/ytdlbot
ENV TZ=Asia/Shanghai ENV TZ=Asia/Shanghai
RUN apk update && apk add --no-cache ffmpeg vnstat RUN apk update && apk add --no-cache ffmpeg vnstat
@ -15,4 +15,4 @@ COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo
COPY . /ytdlbot COPY . /ytdlbot
CMD ["/usr/local/bin/supervisord", "-c" ,"/ytdlbot/supervisor.conf"] CMD ["/usr/local/bin/supervisord", "-c" ,"/ytdlbot/conf/supervisor.conf"]

View File

@ -1 +1 @@
worker: python ytdl.py worker: python ytdl_bot.py

View File

@ -9,10 +9,10 @@ command=vnstatd -n
autorestart=true autorestart=true
[program:ytdl] [program:ytdl]
directory=/ytdlbot directory=/ytdlbot/ytdlbot/
command=python ytdl.py command=python ytdl_bot.py
autorestart=true autorestart=true
redirect_stderr=true
stdout_logfile=/dev/fd/1 stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0 stdout_logfile_maxbytes=0
redirect_stderr=true

View File

@ -14,13 +14,28 @@ services:
logging: logging:
driver: none driver: none
mysql:
image: mysql:5.7
restart: always
volumes:
- ./db_data:/var/lib/mysql
environment:
MYSQL_ROOT_PASSWORD: 'root'
logging:
driver: none
ytdl: ytdl:
image: bennythink/ytdlbot image: bennythink/ytdlbot
env_file: env_file:
- env/ytdl.env - env/ytdl.env
volumes:
- ./data/vip.sqlite:/ytdlbot/vip.sqlite
restart: always restart: always
depends_on: depends_on:
- socat - socat
- redis - redis
worker:
image: bennythink/ytdlbot
env_file:
- env/ytdl.env
restart: always
command: [ "/usr/local/bin/celery" ,"-A","tasks","worker","--loglevel=info","--pool=prefork","--concurrency=200" ]

View File

@ -1,4 +1,4 @@
pyrogram==1.2.11 pyrogram==1.2.20
tgcrypto==1.2.2 tgcrypto==1.2.2
yt-dlp==2021.12.1 yt-dlp==2021.12.1
youtube-dl==2021.6.6 youtube-dl==2021.6.6
@ -6,6 +6,7 @@ APScheduler==3.7.0
beautifultable==1.0.1 beautifultable==1.0.1
ffmpeg-python==0.2.0 ffmpeg-python==0.2.0
PyMySQL==1.0.2 PyMySQL==1.0.2
celery==5.2.2
supervisor supervisor
tgbot-ping tgbot-ping

View File

@ -18,7 +18,7 @@ import time
from tqdm import tqdm from tqdm import tqdm
from db import Redis from db import Redis
from ytdl import create_app from ytdl_bot import create_app
parser = argparse.ArgumentParser(description='Broadcast to users') parser = argparse.ArgumentParser(description='Broadcast to users')
parser.add_argument('-m', help='message', required=True) parser.add_argument('-m', help='message', required=True)

21
ytdlbot/client_init.py Normal file
View File

@ -0,0 +1,21 @@
#!/usr/local/bin/python3
# coding: utf-8
# ytdlbot - client_init.py
# 12/29/21 16:20
#
__author__ = "Benny <benny.think@gmail.com>"
from pyrogram import Client
from config import APP_HASH, APP_ID, TOKEN, WORKERS
def create_app(session="ytdl", workers=WORKERS):
_app = Client(session, APP_ID, APP_HASH,
bot_token=TOKEN, workers=workers,
# proxy={"hostname": "host.docker.internal", "port": 1086}
)
return _app

View File

@ -10,10 +10,11 @@ __author__ = "Benny <benny.think@gmail.com>"
import os import os
# general settings # general settings
WORKERS: "int" = int(os.getenv("WORKERS", 500)) WORKERS: "int" = int(os.getenv("WORKERS", 200))
APP_ID: "int" = int(os.getenv("APP_ID", 111)) APP_ID: "int" = int(os.getenv("APP_ID", 111))
APP_HASH = os.getenv("APP_HASH", "111") APP_HASH = os.getenv("APP_HASH", "111")
TOKEN = os.getenv("TOKEN", "3703WLI") TOKEN = os.getenv("TOKEN", "3703WLI")
REDIS = os.getenv("REDIS") REDIS = os.getenv("REDIS")
# quota settings # quota settings
@ -37,3 +38,10 @@ OWNER = os.getenv("OWNER", "BennyThink")
AUTHORIZED_USER: "str" = os.getenv("AUTHORIZED", "") AUTHORIZED_USER: "str" = os.getenv("AUTHORIZED", "")
# membership requires: the format could be username/chat_id of channel or group # membership requires: the format could be username/chat_id of channel or group
REQUIRED_MEMBERSHIP: "str" = os.getenv("REQUIRED_MEMBERSHIP", "") REQUIRED_MEMBERSHIP: "str" = os.getenv("REQUIRED_MEMBERSHIP", "")
# celery related
ENABLE_CELERY = os.getenv("ENABLE_CELERY", False)
BROKER = os.getenv("BROKER", f"redis://{REDIS}:6379/4")
MYSQL_HOST = os.getenv("MYSQL_HOST", "localhost")
MYSQL_USER = os.getenv("MYSQL_USER", "root")
MYSQL_PASS = os.getenv("MYSQL_PASS", "root")

View File

@ -24,7 +24,7 @@ apply_log_formatter()
def get_username(chat_id): def get_username(chat_id):
from ytdl import create_app from ytdl_bot import create_app
with tempfile.NamedTemporaryFile() as tmp: with tempfile.NamedTemporaryFile() as tmp:
with create_app(tmp.name, 1) as app: with create_app(tmp.name, 1) as app:
data = app.get_chat(chat_id).first_name data = app.get_chat(chat_id).first_name

108
ytdlbot/tasks.py Normal file
View File

@ -0,0 +1,108 @@
#!/usr/local/bin/python3
# coding: utf-8
# ytdlbot - tasks.py
# 12/29/21 14:57
#
__author__ = "Benny <benny.think@gmail.com>"
import logging
import os
import pathlib
import tempfile
import time
from celery import Celery
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from client_init import create_app
from config import BROKER, ENABLE_CELERY
from constant import BotText
from db import Redis
from downloader import sizeof_fmt, upload_hook, ytdl_download
from utils import get_metadata, get_user_settings
bot_text = BotText()
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(filename)s [%(levelname)s]: %(message)s')
# celery -A tasks worker --loglevel=info --pool=solo
# app = Celery('celery', broker=BROKER, accept_content=['pickle'], task_serializer='pickle')
app = Celery('celery', broker=BROKER)
celery_client = create_app(app.main, 5)
@app.task()
def download_task(chat_id, message_id, url):
logging.info("celery tasks started for %s", url)
with celery_client:
bot_msg = celery_client.get_messages(chat_id, message_id)
normal_download(bot_msg, celery_client, url)
logging.info("celery tasks ended.")
def download_entrance(bot_msg, client, url):
if ENABLE_CELERY:
download_task.delay(bot_msg.chat.id, bot_msg.message_id, url)
else:
normal_download(bot_msg, client, url)
def normal_download(bot_msg, client, url):
chat_id = bot_msg.chat.id
temp_dir = tempfile.TemporaryDirectory()
result = ytdl_download(url, temp_dir.name, bot_msg)
logging.info("Download complete.")
markup = InlineKeyboardMarkup(
[
[ # First row
InlineKeyboardButton( # Generates a callback query when pressed
"audio",
callback_data="audio"
)
]
]
)
if result["status"]:
client.send_chat_action(chat_id, 'upload_document')
video_paths = result["filepath"]
bot_msg.edit_text('Download complete. Sending now...')
for video_path in video_paths:
filename = pathlib.Path(video_path).name
remain = bot_text.remaining_quota_caption(chat_id)
size = sizeof_fmt(os.stat(video_path).st_size)
meta = get_metadata(video_path)
cap = f"`{filename}`\n\n{url}\n\nInfo: {meta['width']}x{meta['height']} {size}\n\n{remain}"
settings = get_user_settings(str(chat_id))
if settings[2] == "document":
logging.info("Sending as document")
client.send_document(chat_id, video_path,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
thumb=meta["thumb"]
)
else:
logging.info("Sending as video")
client.send_video(chat_id, video_path,
supports_streaming=True,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
**meta
)
Redis().update_metrics("video_success")
bot_msg.edit_text('Download success!✅')
else:
client.send_chat_action(chat_id, 'typing')
tb = result["error"][0:4000]
bot_msg.edit_text(f"Download failed!❌\n\n```{tb}```", disable_web_page_preview=True)
temp_dir.cleanup()
if __name__ == '__main__':
download_task("", "", "")

View File

@ -9,6 +9,8 @@ __author__ = "Benny <benny.think@gmail.com>"
import logging import logging
import ffmpeg
from db import MySQL from db import MySQL
@ -70,3 +72,19 @@ def adjust_formats(user_id: "str", url: "str", formats: "list"):
for m in mapping.get(settings[1], []): for m in mapping.get(settings[1], []):
formats.insert(0, f"bestvideo[ext=mp4][height={m}]+bestaudio[ext=m4a]") formats.insert(0, f"bestvideo[ext=mp4][height={m}]+bestaudio[ext=m4a]")
formats.insert(1, f"bestvideo[vcodec^=avc][height={m}]+bestaudio[acodec^=mp4a]/best[vcodec^=avc]/best") formats.insert(1, f"bestvideo[vcodec^=avc][height={m}]+bestaudio[acodec^=mp4a]/best[vcodec^=avc]/best")
def get_metadata(video_path):
width, height, duration = 1280, 720, 0
try:
video_streams = ffmpeg.probe(video_path, select_streams="v")
for item in video_streams.get("streams", []):
height = item["height"]
width = item["width"]
duration = int(float(video_streams["format"]["duration"]))
except Exception as e:
logging.error(e)
thumb = video_path + "-thunmnail.png"
ffmpeg.input(video_path, ss=duration / 2).filter('scale', width, -1).output(thumb, vframes=1).run()
return dict(height=height, width=width, duration=duration, thumb=thumb)

View File

@ -9,36 +9,26 @@ __author__ = "Benny <benny.think@gmail.com>"
import logging import logging
import os import os
import pathlib
import re import re
import tempfile import tempfile
import typing import typing
import ffmpeg
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from pyrogram import Client, filters, types from pyrogram import Client, filters, types
from pyrogram.errors.exceptions.bad_request_400 import UserNotParticipant from pyrogram.errors.exceptions.bad_request_400 import UserNotParticipant
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from tgbot_ping import get_runtime from tgbot_ping import get_runtime
from config import (APP_HASH, APP_ID, AUTHORIZED_USER, ENABLE_VIP, OWNER, from client_init import create_app
REQUIRED_MEMBERSHIP, TOKEN, WORKERS) from config import (AUTHORIZED_USER, ENABLE_CELERY, ENABLE_VIP, OWNER,
REQUIRED_MEMBERSHIP)
from constant import BotText from constant import BotText
from db import MySQL, Redis from db import MySQL, Redis
from downloader import convert_flac, sizeof_fmt, upload_hook, ytdl_download from downloader import convert_flac
from limit import verify_payment from limit import verify_payment
from tasks import download_entrance
from utils import customize_logger, get_user_settings, set_user_settings from utils import customize_logger, get_user_settings, set_user_settings
def create_app(session="ytdl", workers=WORKERS):
_app = Client(session, APP_ID, APP_HASH,
bot_token=TOKEN, workers=workers,
proxy={'hostname': '127.0.0.1', 'port': 1086}
)
return _app
customize_logger(["pyrogram.client", "pyrogram.session.session", "pyrogram.client", "pyrogram.connection.connection"]) customize_logger(["pyrogram.client", "pyrogram.session.session", "pyrogram.client", "pyrogram.connection.connection"])
app = create_app() app = create_app()
bot_text = BotText() bot_text = BotText()
@ -46,22 +36,6 @@ bot_text = BotText()
logging.info("Authorized users are %s", AUTHORIZED_USER) logging.info("Authorized users are %s", AUTHORIZED_USER)
def get_metadata(video_path):
width, height, duration = 1280, 720, 0
try:
video_streams = ffmpeg.probe(video_path, select_streams="v")
for item in video_streams.get("streams", []):
height = item["height"]
width = item["width"]
duration = int(float(video_streams["format"]["duration"]))
except Exception as e:
logging.error(e)
thumb = video_path + "-thunmnail.png"
ffmpeg.input(video_path, ss=duration / 2).filter('scale', width, -1).output(thumb, vframes=1).run()
return dict(height=height, width=width, duration=duration, thumb=thumb)
def private_use(func): def private_use(func):
def wrapper(client: "Client", message: "types.Message"): def wrapper(client: "Client", message: "types.Message"):
chat_id = getattr(message.from_user, "id", None) chat_id = getattr(message.from_user, "id", None)
@ -194,58 +168,10 @@ def download_handler(client: "Client", message: "types.Message"):
Redis().update_metrics("video_request") Redis().update_metrics("video_request")
bot_msg: typing.Union["types.Message", "typing.Any"] = message.reply_text("Processing", quote=True) bot_msg: typing.Union["types.Message", "typing.Any"] = message.reply_text("Processing", quote=True)
client.send_chat_action(chat_id, 'upload_video') client.send_chat_action(chat_id, 'upload_video')
temp_dir = tempfile.TemporaryDirectory() # temp_dir = tempfile.TemporaryDirectory()
download_entrance(bot_msg, client, url)
result = ytdl_download(url, temp_dir.name, bot_msg) # temp_dir.cleanup()
logging.info("Download complete.")
markup = InlineKeyboardMarkup(
[
[ # First row
InlineKeyboardButton( # Generates a callback query when pressed
"audio",
callback_data="audio"
)
]
]
)
if result["status"]:
client.send_chat_action(chat_id, 'upload_document')
video_paths = result["filepath"]
bot_msg.edit_text('Download complete. Sending now...')
for video_path in video_paths:
filename = pathlib.Path(video_path).name
remain = bot_text.remaining_quota_caption(chat_id)
size = sizeof_fmt(os.stat(video_path).st_size)
meta = get_metadata(video_path)
cap = f"`{filename}`\n\n{url}\n\nInfo: {meta['width']}x{meta['height']} {size}\n\n{remain}"
settings = get_user_settings(str(chat_id))
if settings[2] == "document":
logging.info("Sending as document")
client.send_document(chat_id, video_path,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
thumb=meta["thumb"]
)
else:
logging.info("Sending as video")
client.send_video(chat_id, video_path,
supports_streaming=True,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
**meta
)
Redis().update_metrics("video_success")
bot_msg.edit_text('Download success!✅')
else:
client.send_chat_action(chat_id, 'typing')
tb = result["error"][0:4000]
bot_msg.edit_text(f"Download failed!❌\n\n```{tb}```", disable_web_page_preview=True)
temp_dir.cleanup()
@app.on_callback_query(filters.regex(r"document|video")) @app.on_callback_query(filters.regex(r"document|video"))
@ -302,7 +228,7 @@ if __name__ == '__main__':
By @BennyThink, VIP mode: {ENABLE_VIP} By @BennyThink, VIP mode: {ENABLE_VIP}, Distribution: {ENABLE_CELERY}
""" """
print(banner) print(banner)
app.run() app.run()