Use ffmpeg to convert GIF to MP4

This commit is contained in:
洛水居室 2023-04-14 11:53:15 +08:00 committed by GitHub
parent 154b5fd243
commit 3fd01d5d80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 76 additions and 11 deletions

View File

@ -1,4 +1,5 @@
import asyncio
import os
import re
from typing import List
@ -118,11 +119,15 @@ class Hyperion:
return art_list
async def download_image(self, art_id: int, url: str, page: int = 0) -> ArtworkImage:
image = url.endswith(".jpg") or url.endswith(".png")
filename = os.path.basename(url)
_, file_extension = os.path.splitext(filename)
is_image = bool(file_extension in ".jpg" or file_extension in ".png")
response = await self.client.get(
url, params=self.get_images_params(resize=2000) if image else None, timeout=10, de_json=False
url, params=self.get_images_params(resize=2000) if is_image else None, timeout=10, de_json=False
)
return ArtworkImage(
art_id=art_id, page=page, file_name=filename, file_extension=url.split(".")[-1], data=response.content
)
return ArtworkImage(art_id=art_id, page=page, ext=url.split(".")[-1], data=response.content)
async def get_new_list(self, gids: int, type_id: int, page_size: int = 20):
"""

View File

@ -11,7 +11,8 @@ class ArtworkImage(BaseModel):
art_id: int
page: int = 0
data: bytes = b""
ext: str = "jpg"
file_name: Optional[str] = None
file_extension: Optional[str] = None
is_error: bool = False
@property

View File

@ -1,5 +1,7 @@
import os
from typing import List, Optional, Tuple, TYPE_CHECKING, Union
import aiofiles
from bs4 import BeautifulSoup
from httpx import Timeout
from telegram import (
@ -20,6 +22,7 @@ from core.config import config
from core.plugin import Plugin, conversation, handler
from modules.apihelper.client.components.hyperion import Hyperion
from modules.apihelper.error import APIHelperException
from utils.helpers import execute, sha1
from utils.log import logger
if TYPE_CHECKING:
@ -59,11 +62,21 @@ class Post(Plugin.Conversation):
)
)
self.last_post_id_list: List[int] = []
self.ffmpeg_enable = False
self.cache_dir = os.path.join(os.getcwd(), "cache")
async def initialize(self):
if config.channels and len(config.channels) > 0:
logger.success("文章定时推送处理已经开启")
self.application.job_queue.run_repeating(self.task, 60)
logger.success("文章定时推送处理已经开启")
output = await execute("ffmpeg -version")
if "ffmpeg version" in output:
self.ffmpeg_enable = True
logger.info("检测到 ffmpeg 可用 已经启动编码转换")
logger.debug("ffmpeg version info\n%s", output)
else:
logger.debug("ffmpeg 不可用 已经禁用编码转换")
async def task(self, context: "ContextTypes.DEFAULT_TYPE"):
temp_post_id_list: List[int] = []
@ -145,14 +158,59 @@ class Post(Plugin.Conversation):
def input_media(
media: "ArtworkImage", *args, **kwargs
) -> Union[None, InputMediaDocument, InputMediaPhoto, InputMediaVideo]:
file_type = media.format
if file_type is not None:
if file_type.lower() in {"jpg", "jpeg", "png", "webp"}:
file_extension = media.file_extension
filename = media.file_name
if file_extension is not None:
if file_extension in {"jpg", "jpeg", "png", "webp"}:
return InputMediaPhoto(media.data, *args, **kwargs)
if file_type.lower() in {"gif", "mp4", "mov", "avi", "mkv", "webm", "flv"}:
return InputMediaVideo(media.data, *args, **kwargs)
if file_extension in {"gif", "mp4", "mov", "avi", "mkv", "webm", "flv"}:
return InputMediaVideo(media.data, filename=filename, *args, **kwargs)
return InputMediaDocument(media.data, *args, **kwargs)
@staticmethod
def get_ffmpeg_command(input_file: str, output_file: str):
return f'ffmpeg -i "{input_file}" -c:v libx264 -crf 20 -vf "fps=30,format=yuv420p" "{output_file}"'
async def gif_to_mp4(self, media: "List[ArtworkImage]"):
if self.ffmpeg_enable:
for i in media:
if i.file_extension == "gif":
file_path = os.path.join(self.cache_dir, i.file_name)
file_name, _ = os.path.splitext(i.file_name)
output_file = file_name + ".mp4"
output_path = os.path.join(self.cache_dir, output_file)
if os.path.exists(output_path):
async with aiofiles.open(output_path, mode="rb") as f:
i.data = await f.read()
i.file_name = output_file
i.file_extension = "mp4"
continue
async with aiofiles.open(file_path, mode="wb") as f:
await f.write(i.data)
temp_file = sha1(file_name) + ".mp4"
temp_path = os.path.join(self.cache_dir, temp_file)
command = self.get_ffmpeg_command(file_path, temp_path)
result = await execute(command)
if "exiting" in result:
logger.error("ffmpeg 执行失败\n%s", result)
continue
logger.debug("ffmpeg 执行成功\n%s", result)
os.rename(temp_path, output_path)
if os.path.exists(output_path):
async with aiofiles.open(output_path, mode="rb") as f:
i.data = await f.read()
i.file_name = output_file
i.file_extension = "mp4"
else:
logger.error(
"输出文件不存在!\nfile_path[%s]\noutput_path[%s]\ntemp_file[%s]\nffmpeg result[%s]\n",
file_path,
output_path,
temp_file,
result,
)
return media
@conversation.entry_point
@handler.callback_query(pattern=r"^post_admin\|", block=False)
async def callback_query_start(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> int:
@ -219,6 +277,7 @@ class Post(Plugin.Conversation):
async def send_post_info(self, post_handler_data: PostHandlerData, message: "Message", post_id: int) -> int:
post_info = await self.bbs.get_post_info(self.gids, post_id)
post_images = await self.bbs.get_images_by_post_id(self.gids, post_id)
post_images = await self.gif_to_mp4(post_images)
post_data = post_info["post"]["post"]
post_subject = post_data["subject"]
post_soup = BeautifulSoup(post_data["content"], features="html.parser")
@ -229,7 +288,7 @@ class Post(Plugin.Conversation):
await message.reply_text(f"警告!图片字符描述已经超过 {MessageLimit.CAPTION_LENGTH} 个字,已经切割")
try:
if len(post_images) > 1:
media = [self.input_media(img_info) for img_info in post_images if img_info.format]
media = [self.input_media(img_info) for img_info in post_images if not img_info.is_error]
media[0] = self.input_media(media=post_images[0], caption=post_text, parse_mode=ParseMode.MARKDOWN_V2)
if len(media) > 10:
media = media[:10]
@ -406,7 +465,7 @@ class Post(Plugin.Conversation):
post_text += f" \\#{tag}"
try:
if len(post_images) > 1:
media = [self.input_media(img_info) for img_info in post_images if img_info.format]
media = [self.input_media(img_info) for img_info in post_images if not img_info.is_error]
media[0] = self.input_media(media=post_images[0], caption=post_text, parse_mode=ParseMode.MARKDOWN_V2)
await context.bot.send_media_group(channel_id, media=media, write_timeout=len(media) * 5)
elif len(post_images) == 1: