GramCore/services/template/services.py

208 lines
8.0 KiB
Python
Raw Permalink Normal View History

import asyncio
from typing import Optional
from urllib.parse import urlencode, urljoin, urlsplit
from uuid import uuid4
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from jinja2 import Environment, FileSystemLoader, Template
from playwright.async_api import ViewportSize
from gram_core.application import Application
from gram_core.base_service import BaseService
from gram_core.config import config as application_config
from gram_core.dependence.aiobrowser import AioBrowser
from gram_core.services.template.cache import HtmlToFileIdCache, TemplatePreviewCache
from gram_core.services.template.error import QuerySelectorNotFound
from gram_core.services.template.models import FileType, RenderResult
from utils.const import PROJECT_ROOT
from utils.log import logger
__all__ = ("TemplateService", "TemplatePreviewer")
class TemplateService(BaseService):
def __init__(
self,
app: Application,
browser: AioBrowser,
html_to_file_id_cache: HtmlToFileIdCache,
preview_cache: TemplatePreviewCache,
template_dir: str = "resources",
):
self._browser = browser
self.template_dir = PROJECT_ROOT / template_dir
self._jinja2_env = Environment(
loader=FileSystemLoader(template_dir),
enable_async=True,
autoescape=True,
auto_reload=application_config.debug,
)
self.using_preview = application_config.debug and application_config.webserver.enable
if self.using_preview:
self.previewer = TemplatePreviewer(self, preview_cache, app.web_app)
self.html_to_file_id_cache = html_to_file_id_cache
def get_template(self, template_name: str) -> Template:
return self._jinja2_env.get_template(template_name)
async def render_async(self, template_name: str, template_data: dict) -> str:
"""模板渲染
:param template_name: 模板文件名
:param template_data: 模板数据
"""
loop = asyncio.get_event_loop()
start_time = loop.time()
template = self.get_template(template_name)
html = await template.render_async(**template_data)
logger.debug("%s 模板渲染使用了 %s", template_name, str(loop.time() - start_time))
return html
async def render(
self,
template_name: str,
template_data: dict,
viewport: Optional[ViewportSize] = None,
full_page: bool = True,
evaluate: Optional[str] = None,
query_selector: Optional[str] = None,
file_type: FileType = FileType.PHOTO,
ttl: int = 24 * 60 * 60,
caption: Optional[str] = None,
parse_mode: Optional[str] = None,
filename: Optional[str] = None,
) -> RenderResult:
"""模板渲染成图片
:param template_name: 模板文件名
:param template_data: 模板数据
:param viewport: 截图大小
:param full_page: 是否长截图
:param evaluate: 页面加载后运行的 js
:param query_selector: 截图选择器
:param file_type: 缓存的文件类型
:param ttl: 缓存时间
:param caption: 图片描述
:param parse_mode: 图片描述解析模式
:param filename: 文件名字
:return:
"""
loop = asyncio.get_event_loop()
start_time = loop.time()
template = self.get_template(template_name)
if self.using_preview:
preview_url = await self.previewer.get_preview_url(template_name, template_data)
logger.debug("调试模板 URL: \n%s", preview_url)
html = await template.render_async(**template_data)
logger.debug("%s 模板渲染使用了 %s", template_name, str(loop.time() - start_time))
file_id = await self.html_to_file_id_cache.get_data(html, file_type.name)
if file_id and not application_config.debug:
logger.debug("%s 命中缓存,返回 file_id[%s]", template_name, file_id)
return RenderResult(
html=html,
photo=file_id,
file_type=file_type,
cache=self.html_to_file_id_cache,
ttl=ttl,
caption=caption,
parse_mode=parse_mode,
filename=filename,
)
browser = await self._browser.get_browser()
start_time = loop.time()
page = await browser.new_page(viewport=viewport)
uri = (PROJECT_ROOT / template.filename).as_uri()
await page.goto(uri)
await page.set_content(html, wait_until="networkidle")
if evaluate:
await page.evaluate(evaluate)
clip = None
if query_selector:
try:
card = await page.query_selector(query_selector)
if not card:
raise QuerySelectorNotFound
clip = await card.bounding_box()
if not clip:
raise QuerySelectorNotFound
except QuerySelectorNotFound:
logger.warning("未找到 %s 元素", query_selector)
png_data = await page.screenshot(clip=clip, full_page=full_page)
await page.close()
logger.debug("%s 图片渲染使用了 %s", template_name, str(loop.time() - start_time))
return RenderResult(
html=html,
photo=png_data,
file_type=file_type,
cache=self.html_to_file_id_cache,
ttl=ttl,
caption=caption,
parse_mode=parse_mode,
filename=filename,
)
class TemplatePreviewer(BaseService, load=application_config.webserver.enable and application_config.debug):
def __init__(
self,
template_service: TemplateService,
cache: TemplatePreviewCache,
web_app: FastAPI,
):
self.web_app = web_app
self.template_service = template_service
self.cache = cache
self.register_routes()
async def get_preview_url(self, template: str, data: dict):
"""获取预览 URL"""
components = urlsplit(application_config.webserver.url)
path = urljoin("/preview/", template)
query = {}
# 如果有数据,暂存在 redis 中
if data:
key = str(uuid4())
await self.cache.set_data(key, data)
query["key"] = key
# noinspection PyProtectedMember
return components._replace(path=path, query=urlencode(query)).geturl()
def register_routes(self):
"""注册预览用到的路由"""
@self.web_app.get("/preview/{path:path}")
async def preview_template(path: str, key: Optional[str] = None): # pylint: disable=W0612
# 如果是 /preview/ 开头的静态文件,直接返回内容。比如使用相对链接 ../ 引入的静态资源
if not path.endswith((".html", ".jinja2")):
full_path = self.template_service.template_dir / path
if not full_path.is_file():
raise HTTPException(status_code=404, detail=f"Template '{path}' not found")
return FileResponse(full_path)
# 取回暂存的渲染数据
data = await self.cache.get_data(key) if key else {}
if key and data is None:
raise HTTPException(status_code=404, detail=f"Template data {key} not found")
# 渲染 jinja2 模板
html = await self.template_service.render_async(path, data)
# 将本地 URL file:// 修改为 HTTP url因为浏览器内不允许加载本地文件
# file:///project_dir/cache/image.jpg => /cache/image.jpg
html = html.replace(PROJECT_ROOT.as_uri(), "")
return HTMLResponse(html)
# 其他静态资源
for name in ["cache", "resources"]:
directory = PROJECT_ROOT / name
directory.mkdir(exist_ok=True)
self.web_app.mount(f"/{name}", StaticFiles(directory=PROJECT_ROOT / name), name=name)