use celery

This commit is contained in:
BennyThink 2021-12-29 16:57:06 +08:00
parent c755f392de
commit c96180540a
No known key found for this signature in database
GPG Key ID: 6CD0DBDA5235D481
17 changed files with 199 additions and 97 deletions

2
.dockerignore Normal file
View File

@ -0,0 +1,2 @@
env
db_data

3
.gitignore vendored
View File

@ -151,3 +151,6 @@ dmypy.json
/.idea/dataSources/bf75f0a6-c774-4ecf-9448-2086f57b70df/entities/entities.dat_i
/.idea/dataSources/bf75f0a6-c774-4ecf-9448-2086f57b70df/entities/entities.dat_i.len
/.idea/dataSources/bf75f0a6-c774-4ecf-9448-2086f57b70df/storage_v2/_src_/schema/main.uQUzAA.meta
db_data/*
env/*
.ash_history

View File

@ -6,7 +6,7 @@ RUN pip3 install --user -r /tmp/requirements.txt && rm /tmp/requirements.txt
FROM python:3.9-alpine
WORKDIR /ytdlbot
WORKDIR /ytdlbot/ytdlbot
ENV TZ=Asia/Shanghai
RUN apk update && apk add --no-cache ffmpeg vnstat
@ -15,4 +15,4 @@ COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo
COPY . /ytdlbot
CMD ["/usr/local/bin/supervisord", "-c" ,"/ytdlbot/supervisor.conf"]
CMD ["/usr/local/bin/supervisord", "-c" ,"/ytdlbot/conf/supervisor.conf"]

View File

@ -1 +1 @@
worker: python ytdl.py
worker: python ytdl_bot.py

View File

@ -9,10 +9,10 @@ command=vnstatd -n
autorestart=true
[program:ytdl]
directory=/ytdlbot
command=python ytdl.py
directory=/ytdlbot/ytdlbot/
command=python ytdl_bot.py
autorestart=true
redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
redirect_stderr=true
stdout_logfile_maxbytes=0

View File

@ -14,13 +14,28 @@ services:
logging:
driver: none
mysql:
image: mysql:5.7
restart: always
volumes:
- ./db_data:/var/lib/mysql
environment:
MYSQL_ROOT_PASSWORD: 'root'
logging:
driver: none
ytdl:
image: bennythink/ytdlbot
env_file:
- env/ytdl.env
volumes:
- ./data/vip.sqlite:/ytdlbot/vip.sqlite
restart: always
depends_on:
- socat
- redis
- redis
worker:
image: bennythink/ytdlbot
env_file:
- env/ytdl.env
restart: always
command: [ "/usr/local/bin/celery" ,"-A","tasks","worker","--loglevel=info","--pool=prefork","--concurrency=200" ]

View File

@ -1,4 +1,4 @@
pyrogram==1.2.11
pyrogram==1.2.20
tgcrypto==1.2.2
yt-dlp==2021.12.1
youtube-dl==2021.6.6
@ -6,6 +6,7 @@ APScheduler==3.7.0
beautifultable==1.0.1
ffmpeg-python==0.2.0
PyMySQL==1.0.2
celery==5.2.2
supervisor
tgbot-ping

View File

@ -18,7 +18,7 @@ import time
from tqdm import tqdm
from db import Redis
from ytdl import create_app
from ytdl_bot import create_app
parser = argparse.ArgumentParser(description='Broadcast to users')
parser.add_argument('-m', help='message', required=True)

21
ytdlbot/client_init.py Normal file
View File

@ -0,0 +1,21 @@
#!/usr/local/bin/python3
# coding: utf-8
# ytdlbot - client_init.py
# 12/29/21 16:20
#
__author__ = "Benny <benny.think@gmail.com>"
from pyrogram import Client
from config import APP_HASH, APP_ID, TOKEN, WORKERS
def create_app(session="ytdl", workers=WORKERS):
_app = Client(session, APP_ID, APP_HASH,
bot_token=TOKEN, workers=workers,
# proxy={"hostname": "host.docker.internal", "port": 1086}
)
return _app

View File

@ -10,10 +10,11 @@ __author__ = "Benny <benny.think@gmail.com>"
import os
# general settings
WORKERS: "int" = int(os.getenv("WORKERS", 500))
WORKERS: "int" = int(os.getenv("WORKERS", 200))
APP_ID: "int" = int(os.getenv("APP_ID", 111))
APP_HASH = os.getenv("APP_HASH", "111")
TOKEN = os.getenv("TOKEN", "3703WLI")
REDIS = os.getenv("REDIS")
# quota settings
@ -37,3 +38,10 @@ OWNER = os.getenv("OWNER", "BennyThink")
AUTHORIZED_USER: "str" = os.getenv("AUTHORIZED", "")
# membership requires: the format could be username/chat_id of channel or group
REQUIRED_MEMBERSHIP: "str" = os.getenv("REQUIRED_MEMBERSHIP", "")
# celery related
ENABLE_CELERY = os.getenv("ENABLE_CELERY", False)
BROKER = os.getenv("BROKER", f"redis://{REDIS}:6379/4")
MYSQL_HOST = os.getenv("MYSQL_HOST", "localhost")
MYSQL_USER = os.getenv("MYSQL_USER", "root")
MYSQL_PASS = os.getenv("MYSQL_PASS", "root")

View File

@ -24,7 +24,7 @@ apply_log_formatter()
def get_username(chat_id):
from ytdl import create_app
from ytdl_bot import create_app
with tempfile.NamedTemporaryFile() as tmp:
with create_app(tmp.name, 1) as app:
data = app.get_chat(chat_id).first_name

108
ytdlbot/tasks.py Normal file
View File

@ -0,0 +1,108 @@
#!/usr/local/bin/python3
# coding: utf-8
# ytdlbot - tasks.py
# 12/29/21 14:57
#
__author__ = "Benny <benny.think@gmail.com>"
import logging
import os
import pathlib
import tempfile
import time
from celery import Celery
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from client_init import create_app
from config import BROKER, ENABLE_CELERY
from constant import BotText
from db import Redis
from downloader import sizeof_fmt, upload_hook, ytdl_download
from utils import get_metadata, get_user_settings
bot_text = BotText()
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(filename)s [%(levelname)s]: %(message)s')
# celery -A tasks worker --loglevel=info --pool=solo
# app = Celery('celery', broker=BROKER, accept_content=['pickle'], task_serializer='pickle')
app = Celery('celery', broker=BROKER)
celery_client = create_app(app.main, 5)
@app.task()
def download_task(chat_id, message_id, url):
logging.info("celery tasks started for %s", url)
with celery_client:
bot_msg = celery_client.get_messages(chat_id, message_id)
normal_download(bot_msg, celery_client, url)
logging.info("celery tasks ended.")
def download_entrance(bot_msg, client, url):
if ENABLE_CELERY:
download_task.delay(bot_msg.chat.id, bot_msg.message_id, url)
else:
normal_download(bot_msg, client, url)
def normal_download(bot_msg, client, url):
chat_id = bot_msg.chat.id
temp_dir = tempfile.TemporaryDirectory()
result = ytdl_download(url, temp_dir.name, bot_msg)
logging.info("Download complete.")
markup = InlineKeyboardMarkup(
[
[ # First row
InlineKeyboardButton( # Generates a callback query when pressed
"audio",
callback_data="audio"
)
]
]
)
if result["status"]:
client.send_chat_action(chat_id, 'upload_document')
video_paths = result["filepath"]
bot_msg.edit_text('Download complete. Sending now...')
for video_path in video_paths:
filename = pathlib.Path(video_path).name
remain = bot_text.remaining_quota_caption(chat_id)
size = sizeof_fmt(os.stat(video_path).st_size)
meta = get_metadata(video_path)
cap = f"`{filename}`\n\n{url}\n\nInfo: {meta['width']}x{meta['height']} {size}\n\n{remain}"
settings = get_user_settings(str(chat_id))
if settings[2] == "document":
logging.info("Sending as document")
client.send_document(chat_id, video_path,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
thumb=meta["thumb"]
)
else:
logging.info("Sending as video")
client.send_video(chat_id, video_path,
supports_streaming=True,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
**meta
)
Redis().update_metrics("video_success")
bot_msg.edit_text('Download success!✅')
else:
client.send_chat_action(chat_id, 'typing')
tb = result["error"][0:4000]
bot_msg.edit_text(f"Download failed!❌\n\n```{tb}```", disable_web_page_preview=True)
temp_dir.cleanup()
if __name__ == '__main__':
download_task("", "", "")

View File

@ -9,6 +9,8 @@ __author__ = "Benny <benny.think@gmail.com>"
import logging
import ffmpeg
from db import MySQL
@ -70,3 +72,19 @@ def adjust_formats(user_id: "str", url: "str", formats: "list"):
for m in mapping.get(settings[1], []):
formats.insert(0, f"bestvideo[ext=mp4][height={m}]+bestaudio[ext=m4a]")
formats.insert(1, f"bestvideo[vcodec^=avc][height={m}]+bestaudio[acodec^=mp4a]/best[vcodec^=avc]/best")
def get_metadata(video_path):
width, height, duration = 1280, 720, 0
try:
video_streams = ffmpeg.probe(video_path, select_streams="v")
for item in video_streams.get("streams", []):
height = item["height"]
width = item["width"]
duration = int(float(video_streams["format"]["duration"]))
except Exception as e:
logging.error(e)
thumb = video_path + "-thunmnail.png"
ffmpeg.input(video_path, ss=duration / 2).filter('scale', width, -1).output(thumb, vframes=1).run()
return dict(height=height, width=width, duration=duration, thumb=thumb)

View File

@ -9,36 +9,26 @@ __author__ = "Benny <benny.think@gmail.com>"
import logging
import os
import pathlib
import re
import tempfile
import typing
import ffmpeg
from apscheduler.schedulers.background import BackgroundScheduler
from pyrogram import Client, filters, types
from pyrogram.errors.exceptions.bad_request_400 import UserNotParticipant
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from tgbot_ping import get_runtime
from config import (APP_HASH, APP_ID, AUTHORIZED_USER, ENABLE_VIP, OWNER,
REQUIRED_MEMBERSHIP, TOKEN, WORKERS)
from client_init import create_app
from config import (AUTHORIZED_USER, ENABLE_CELERY, ENABLE_VIP, OWNER,
REQUIRED_MEMBERSHIP)
from constant import BotText
from db import MySQL, Redis
from downloader import convert_flac, sizeof_fmt, upload_hook, ytdl_download
from downloader import convert_flac
from limit import verify_payment
from tasks import download_entrance
from utils import customize_logger, get_user_settings, set_user_settings
def create_app(session="ytdl", workers=WORKERS):
_app = Client(session, APP_ID, APP_HASH,
bot_token=TOKEN, workers=workers,
proxy={'hostname': '127.0.0.1', 'port': 1086}
)
return _app
customize_logger(["pyrogram.client", "pyrogram.session.session", "pyrogram.client", "pyrogram.connection.connection"])
app = create_app()
bot_text = BotText()
@ -46,22 +36,6 @@ bot_text = BotText()
logging.info("Authorized users are %s", AUTHORIZED_USER)
def get_metadata(video_path):
width, height, duration = 1280, 720, 0
try:
video_streams = ffmpeg.probe(video_path, select_streams="v")
for item in video_streams.get("streams", []):
height = item["height"]
width = item["width"]
duration = int(float(video_streams["format"]["duration"]))
except Exception as e:
logging.error(e)
thumb = video_path + "-thunmnail.png"
ffmpeg.input(video_path, ss=duration / 2).filter('scale', width, -1).output(thumb, vframes=1).run()
return dict(height=height, width=width, duration=duration, thumb=thumb)
def private_use(func):
def wrapper(client: "Client", message: "types.Message"):
chat_id = getattr(message.from_user, "id", None)
@ -194,58 +168,10 @@ def download_handler(client: "Client", message: "types.Message"):
Redis().update_metrics("video_request")
bot_msg: typing.Union["types.Message", "typing.Any"] = message.reply_text("Processing", quote=True)
client.send_chat_action(chat_id, 'upload_video')
temp_dir = tempfile.TemporaryDirectory()
# temp_dir = tempfile.TemporaryDirectory()
download_entrance(bot_msg, client, url)
result = ytdl_download(url, temp_dir.name, bot_msg)
logging.info("Download complete.")
markup = InlineKeyboardMarkup(
[
[ # First row
InlineKeyboardButton( # Generates a callback query when pressed
"audio",
callback_data="audio"
)
]
]
)
if result["status"]:
client.send_chat_action(chat_id, 'upload_document')
video_paths = result["filepath"]
bot_msg.edit_text('Download complete. Sending now...')
for video_path in video_paths:
filename = pathlib.Path(video_path).name
remain = bot_text.remaining_quota_caption(chat_id)
size = sizeof_fmt(os.stat(video_path).st_size)
meta = get_metadata(video_path)
cap = f"`{filename}`\n\n{url}\n\nInfo: {meta['width']}x{meta['height']} {size}\n\n{remain}"
settings = get_user_settings(str(chat_id))
if settings[2] == "document":
logging.info("Sending as document")
client.send_document(chat_id, video_path,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
thumb=meta["thumb"]
)
else:
logging.info("Sending as video")
client.send_video(chat_id, video_path,
supports_streaming=True,
caption=cap,
progress=upload_hook, progress_args=(bot_msg,),
reply_markup=markup,
**meta
)
Redis().update_metrics("video_success")
bot_msg.edit_text('Download success!✅')
else:
client.send_chat_action(chat_id, 'typing')
tb = result["error"][0:4000]
bot_msg.edit_text(f"Download failed!❌\n\n```{tb}```", disable_web_page_preview=True)
temp_dir.cleanup()
# temp_dir.cleanup()
@app.on_callback_query(filters.regex(r"document|video"))
@ -302,7 +228,7 @@ if __name__ == '__main__':
By @BennyThink, VIP mode: {ENABLE_VIP}
By @BennyThink, VIP mode: {ENABLE_VIP}, Distribution: {ENABLE_CELERY}
"""
print(banner)
app.run()