mirror of
https://github.com/PaiGramTeam/FixMiYouShe.git
synced 2024-11-25 09:27:45 +00:00
✨ Support parse article
This commit is contained in:
parent
c294f3aec6
commit
d6d592cdcf
3
.env.example
Normal file
3
.env.example
Normal file
@ -0,0 +1,3 @@
|
||||
DEBUG=False
|
||||
DOMAIN=127.0.0.1
|
||||
PORT=8080
|
5
.gitignore
vendored
5
.gitignore
vendored
@ -157,4 +157,7 @@ cython_debug/
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
.idea/
|
||||
|
||||
# cache
|
||||
cache/
|
||||
|
@ -1,2 +1,3 @@
|
||||
# FixMiYouShe
|
||||
|
||||
Embed MiYouShe posts, videos, polls, and more on Telegram
|
||||
|
8
main.py
Normal file
8
main.py
Normal file
@ -0,0 +1,8 @@
|
||||
from uvicorn import run
|
||||
|
||||
from src.app import app
|
||||
from src.env import PORT
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run(app, host="0.0.0.0", port=PORT)
|
13
requirements.txt
Normal file
13
requirements.txt
Normal file
@ -0,0 +1,13 @@
|
||||
httpx==0.24.1
|
||||
fastapi~=0.101.1
|
||||
starlette~=0.31.0
|
||||
uvicorn~=0.23.2
|
||||
pydantic
|
||||
python-dotenv
|
||||
coloredlogs
|
||||
pytz
|
||||
apscheduler
|
||||
aiofiles==23.2.1
|
||||
jinja2==3.1.2
|
||||
beautifulsoup4
|
||||
lxml
|
3
src/__init__.py
Normal file
3
src/__init__.py
Normal file
@ -0,0 +1,3 @@
|
||||
import jinja2
|
||||
|
||||
template_env = jinja2.Environment(loader=jinja2.FileSystemLoader("src/templates"))
|
0
src/api/__init__.py
Normal file
0
src/api/__init__.py
Normal file
46
src/api/httpxrequest.py
Normal file
46
src/api/httpxrequest.py
Normal file
@ -0,0 +1,46 @@
|
||||
from contextlib import AbstractAsyncContextManager
|
||||
from types import TracebackType
|
||||
from typing import Optional, Type
|
||||
|
||||
import httpx
|
||||
|
||||
__all__ = ("HTTPXRequest",)
|
||||
|
||||
timeout_int = 20
|
||||
timeout = httpx.Timeout(
|
||||
timeout=timeout_int,
|
||||
read=timeout_int,
|
||||
write=timeout_int,
|
||||
connect=timeout_int,
|
||||
pool=timeout_int,
|
||||
)
|
||||
|
||||
|
||||
class HTTPXRequest(AbstractAsyncContextManager):
|
||||
def __init__(self, *args, headers=None, **kwargs):
|
||||
self._client = httpx.AsyncClient(headers=headers, *args, **kwargs)
|
||||
|
||||
async def __aenter__(self):
|
||||
try:
|
||||
await self.initialize()
|
||||
return self
|
||||
except Exception as exc:
|
||||
await self.shutdown()
|
||||
raise exc
|
||||
|
||||
async def __aexit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc_val: Optional[BaseException],
|
||||
exc_tb: Optional[TracebackType],
|
||||
) -> None:
|
||||
await self.initialize()
|
||||
|
||||
async def initialize(self):
|
||||
if self._client.is_closed:
|
||||
self._client = httpx.AsyncClient(timeout=timeout)
|
||||
|
||||
async def shutdown(self):
|
||||
if self._client.is_closed:
|
||||
return
|
||||
await self._client.aclose()
|
98
src/api/hyperion.py
Normal file
98
src/api/hyperion.py
Normal file
@ -0,0 +1,98 @@
|
||||
from .hyperionrequest import HyperionRequest
|
||||
from .models import PostInfo
|
||||
from ..typedefs import JSON_DATA
|
||||
|
||||
__all__ = ("Hyperion",)
|
||||
|
||||
|
||||
class Hyperion:
|
||||
"""米忽悠bbs相关API请求
|
||||
|
||||
该名称来源于米忽悠的安卓BBS包名结尾,考虑到大部分重要的功能确实是在移动端实现了
|
||||
"""
|
||||
|
||||
POST_FULL_URL = "https://bbs-api.miyoushe.com/post/wapi/getPostFull"
|
||||
POST_FULL_IN_COLLECTION_URL = (
|
||||
"https://bbs-api.miyoushe.com/post/wapi/getPostFullInCollection"
|
||||
)
|
||||
GET_NEW_LIST_URL = "https://bbs-api.miyoushe.com/post/wapi/getNewsList"
|
||||
GET_OFFICIAL_RECOMMENDED_POSTS_URL = (
|
||||
"https://bbs-api.miyoushe.com/post/wapi/getOfficialRecommendedPosts"
|
||||
)
|
||||
|
||||
USER_AGENT = (
|
||||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
|
||||
"Chrome/90.0.4430.72 Safari/537.36"
|
||||
)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.client = HyperionRequest(headers=self.get_headers(), *args, **kwargs)
|
||||
|
||||
def get_headers(self, referer: str = "https://www.miyoushe.com/ys/"):
|
||||
return {"User-Agent": self.USER_AGENT, "Referer": referer}
|
||||
|
||||
@staticmethod
|
||||
def get_list_url_params(
|
||||
forum_id: int, is_good: bool = False, is_hot: bool = False, page_size: int = 20
|
||||
) -> dict:
|
||||
return {
|
||||
"forum_id": forum_id,
|
||||
"gids": 2,
|
||||
"is_good": is_good,
|
||||
"is_hot": is_hot,
|
||||
"page_size": page_size,
|
||||
"sort_type": 1,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def get_images_params(
|
||||
resize: int = 600,
|
||||
quality: int = 80,
|
||||
auto_orient: int = 0,
|
||||
interlace: int = 1,
|
||||
images_format: str = "jpg",
|
||||
):
|
||||
"""
|
||||
image/resize,s_600/quality,q_80/auto-orient,0/interlace,1/format,jpg
|
||||
:param resize: 图片大小
|
||||
:param quality: 图片质量
|
||||
:param auto_orient: 自适应
|
||||
:param interlace: 未知
|
||||
:param images_format: 图片格式
|
||||
:return:
|
||||
"""
|
||||
params = (
|
||||
f"image/resize,s_{resize}/quality,q_{quality}/auto-orient,"
|
||||
f"{auto_orient}/interlace,{interlace}/format,{images_format}"
|
||||
)
|
||||
return {"x-oss-process": params}
|
||||
|
||||
async def get_post_full_in_collection(
|
||||
self, collection_id: int, gids: int = 2, order_type=1
|
||||
) -> JSON_DATA:
|
||||
params = {
|
||||
"collection_id": collection_id,
|
||||
"gids": gids,
|
||||
"order_type": order_type,
|
||||
}
|
||||
response = await self.client.get(
|
||||
url=self.POST_FULL_IN_COLLECTION_URL, params=params
|
||||
)
|
||||
return response
|
||||
|
||||
async def get_post_info(self, gids: int, post_id: int, read: int = 1) -> PostInfo:
|
||||
params = {"gids": gids, "post_id": post_id, "read": read}
|
||||
response = await self.client.get(self.POST_FULL_URL, params=params)
|
||||
return PostInfo.paste_data(response)
|
||||
|
||||
async def get_new_list(self, gids: int, type_id: int, page_size: int = 20):
|
||||
"""
|
||||
?gids=2&page_size=20&type=3
|
||||
:return:
|
||||
"""
|
||||
params = {"gids": gids, "page_size": page_size, "type": type_id}
|
||||
response = await self.client.get(url=self.GET_NEW_LIST_URL, params=params)
|
||||
return response
|
||||
|
||||
async def close(self):
|
||||
await self.client.shutdown()
|
88
src/api/hyperionrequest.py
Normal file
88
src/api/hyperionrequest.py
Normal file
@ -0,0 +1,88 @@
|
||||
from typing import Union
|
||||
|
||||
import httpx
|
||||
from httpx import Response
|
||||
|
||||
from .httpxrequest import HTTPXRequest
|
||||
from ..error import NetworkException, ResponseException, APIHelperTimedOut
|
||||
from ..typedefs import POST_DATA, JSON_DATA
|
||||
|
||||
__all__ = ("HyperionRequest",)
|
||||
|
||||
|
||||
class HyperionRequest(HTTPXRequest):
|
||||
async def get(
|
||||
self,
|
||||
url: str,
|
||||
*args,
|
||||
de_json: bool = True,
|
||||
re_json_data: bool = False,
|
||||
**kwargs,
|
||||
) -> Union[POST_DATA, JSON_DATA, Response]:
|
||||
try:
|
||||
response = await self._client.get(url=url, *args, **kwargs)
|
||||
except httpx.TimeoutException as err:
|
||||
raise APIHelperTimedOut from err
|
||||
except httpx.HTTPError as exc:
|
||||
raise NetworkException(
|
||||
f"Unknown error in HTTP implementation: {repr(exc)}"
|
||||
) from exc
|
||||
if response.is_error:
|
||||
raise ResponseException(
|
||||
message=f"response error in status code: {response.status_code}"
|
||||
)
|
||||
if not de_json:
|
||||
return response
|
||||
json_data = response.json()
|
||||
return_code = json_data.get("retcode", None)
|
||||
data = json_data.get("data", None)
|
||||
message = json_data.get("message", None)
|
||||
if return_code is None:
|
||||
return json_data
|
||||
if return_code != 0:
|
||||
if message is None:
|
||||
raise ResponseException(
|
||||
message=f"response error in return code: {return_code}"
|
||||
)
|
||||
raise ResponseException(response=json_data)
|
||||
if not re_json_data and data is not None:
|
||||
return data
|
||||
return json_data
|
||||
|
||||
async def post(
|
||||
self,
|
||||
url: str,
|
||||
*args,
|
||||
de_json: bool = True,
|
||||
re_json_data: bool = False,
|
||||
**kwargs,
|
||||
) -> Union[POST_DATA, JSON_DATA, Response]:
|
||||
try:
|
||||
response = await self._client.post(url=url, *args, **kwargs)
|
||||
except httpx.TimeoutException as err:
|
||||
raise APIHelperTimedOut from err
|
||||
except httpx.HTTPError as exc:
|
||||
raise NetworkException(
|
||||
f"Unknown error in HTTP implementation: {repr(exc)}"
|
||||
) from exc
|
||||
if response.is_error:
|
||||
raise ResponseException(
|
||||
message=f"response error in status code: {response.status_code}"
|
||||
)
|
||||
if not de_json:
|
||||
return response
|
||||
json_data = response.json()
|
||||
return_code = json_data.get("retcode", None)
|
||||
data = json_data.get("data", None)
|
||||
message = json_data.get("message", None)
|
||||
if return_code is None:
|
||||
return json_data
|
||||
if return_code != 0:
|
||||
if message is None:
|
||||
raise ResponseException(
|
||||
message=f"response error in return code: {return_code}"
|
||||
)
|
||||
raise ResponseException(response=json_data)
|
||||
if not re_json_data and data is not None:
|
||||
return data
|
||||
return json_data
|
56
src/api/models.py
Normal file
56
src/api/models.py
Normal file
@ -0,0 +1,56 @@
|
||||
from typing import Any, List
|
||||
|
||||
from pydantic import BaseModel, PrivateAttr
|
||||
|
||||
__all__ = (
|
||||
"PostStat",
|
||||
"PostInfo",
|
||||
)
|
||||
|
||||
|
||||
class PostStat(BaseModel):
|
||||
reply_num: int = 0
|
||||
forward_num: int = 0
|
||||
like_num: int = 0
|
||||
view_num: int = 0
|
||||
bookmark_num: int = 0
|
||||
|
||||
|
||||
class PostInfo(BaseModel):
|
||||
_data: dict = PrivateAttr()
|
||||
post_id: int
|
||||
user_uid: int
|
||||
subject: str
|
||||
image_urls: List[str]
|
||||
created_at: int
|
||||
video_urls: List[str]
|
||||
|
||||
def __init__(self, _data: dict, **data: Any):
|
||||
super().__init__(**data)
|
||||
self._data = _data
|
||||
|
||||
@classmethod
|
||||
def paste_data(cls, data: dict) -> "PostInfo":
|
||||
_data_post = data["post"]
|
||||
post = _data_post["post"]
|
||||
post_id = post["post_id"]
|
||||
subject = post["subject"]
|
||||
image_list = _data_post["image_list"]
|
||||
image_urls = [image["url"] for image in image_list]
|
||||
vod_list = _data_post["vod_list"]
|
||||
video_urls = [vod["resolutions"][-1]["url"] for vod in vod_list]
|
||||
created_at = post["created_at"]
|
||||
user = _data_post["user"] # 用户数据
|
||||
user_uid = user["uid"] # 用户ID
|
||||
return PostInfo(
|
||||
_data=data,
|
||||
post_id=post_id,
|
||||
user_uid=user_uid,
|
||||
subject=subject,
|
||||
image_urls=image_urls,
|
||||
video_urls=video_urls,
|
||||
created_at=created_at,
|
||||
)
|
||||
|
||||
def __getitem__(self, item):
|
||||
return self._data[item]
|
17
src/app.py
Normal file
17
src/app.py
Normal file
@ -0,0 +1,17 @@
|
||||
from fastapi import FastAPI
|
||||
from starlette.middleware.trustedhost import TrustedHostMiddleware
|
||||
|
||||
from .env import DOMAIN, DEBUG
|
||||
from .route import get_routes
|
||||
from .route.base import UserAgentMiddleware
|
||||
|
||||
app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)
|
||||
app.add_middleware(
|
||||
TrustedHostMiddleware,
|
||||
allowed_hosts=[
|
||||
DOMAIN,
|
||||
],
|
||||
)
|
||||
if not DEBUG:
|
||||
app.add_middleware(UserAgentMiddleware)
|
||||
get_routes()
|
9
src/env.py
Normal file
9
src/env.py
Normal file
@ -0,0 +1,9 @@
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
DEBUG = os.getenv("DEBUG", "True").lower() == "true"
|
||||
DOMAIN = os.getenv("DOMAIN", "127.0.0.1")
|
||||
PORT = int(os.getenv("PORT", 8080))
|
61
src/error.py
Normal file
61
src/error.py
Normal file
@ -0,0 +1,61 @@
|
||||
from typing import Any, Mapping, Optional
|
||||
|
||||
|
||||
class APIHelperException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NetworkException(APIHelperException):
|
||||
pass
|
||||
|
||||
|
||||
class APIHelperTimedOut(APIHelperException):
|
||||
pass
|
||||
|
||||
|
||||
class ResponseException(APIHelperException):
|
||||
code: int = 0
|
||||
message: str = ""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
response: Optional[Mapping[str, Any]] = None,
|
||||
message: Optional[str] = None,
|
||||
) -> None:
|
||||
if response is None:
|
||||
self.message = message
|
||||
_message = message
|
||||
else:
|
||||
self.code = response.get("retcode", self.code)
|
||||
self.message = response.get("message", "")
|
||||
_message = f"[{self.code}] {self.message}"
|
||||
|
||||
super().__init__(_message)
|
||||
|
||||
|
||||
class DataNotFoundError(ResponseException):
|
||||
def __init__(self):
|
||||
message = "response data not find"
|
||||
super().__init__(message=message)
|
||||
|
||||
|
||||
class ReturnCodeError(ResponseException):
|
||||
def __init__(self):
|
||||
message = "response return code error"
|
||||
super().__init__(message=message)
|
||||
|
||||
|
||||
class ArticleError(Exception):
|
||||
"""Article error."""
|
||||
|
||||
def __init__(self, msg: str):
|
||||
self.msg = msg
|
||||
|
||||
|
||||
class ArticleNotFoundError(ArticleError):
|
||||
"""Article not found."""
|
||||
|
||||
def __init__(self, game_id: str, article_id: int):
|
||||
self.game_id = game_id
|
||||
self.article_id = article_id
|
||||
self.msg = f"Article {game_id} {article_id} not found."
|
14
src/log.py
Normal file
14
src/log.py
Normal file
@ -0,0 +1,14 @@
|
||||
import logging
|
||||
|
||||
from coloredlogs import ColoredFormatter
|
||||
|
||||
logs = logging.getLogger("FixMiYouShe")
|
||||
logging_format = "%(levelname)s [%(asctime)s] [%(name)s] %(message)s"
|
||||
logging_handler = logging.StreamHandler()
|
||||
logging_handler.setFormatter(ColoredFormatter(logging_format))
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(logging.ERROR)
|
||||
root_logger.addHandler(logging_handler)
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logs.setLevel(logging.INFO)
|
||||
logger = logging.getLogger("FixMiYouShe")
|
0
src/render/__init__.py
Normal file
0
src/render/__init__.py
Normal file
122
src/render/article.py
Normal file
122
src/render/article.py
Normal file
@ -0,0 +1,122 @@
|
||||
from datetime import datetime
|
||||
from typing import Union, List
|
||||
|
||||
from bs4 import BeautifulSoup, Tag, PageElement
|
||||
|
||||
from src import template_env
|
||||
from src.api.hyperion import Hyperion
|
||||
from src.api.models import PostStat
|
||||
from src.error import ArticleNotFoundError
|
||||
from src.services.cache import (
|
||||
get_article_cache_file_path,
|
||||
get_article_cache_file,
|
||||
write_article_cache_file,
|
||||
)
|
||||
from src.services.scheduler import add_delete_file_job
|
||||
|
||||
GAME_ID_MAP = {"bh3": 1, "ys": 2, "bh2": 3, "wd": 4, "dby": 5, "sr": 6, "zzz": 8}
|
||||
CHANNEL_MAP = {"ys": "yuanshen", "sr": "HSRCN", "zzz": "ZZZNewsletter"}
|
||||
template = template_env.get_template("article.jinja2")
|
||||
|
||||
|
||||
def replace_br(text: str) -> str:
|
||||
if not text:
|
||||
return ""
|
||||
return text.replace("\n", "<br/>\n")
|
||||
|
||||
|
||||
def get_description(soup: BeautifulSoup) -> str:
|
||||
post_text = ""
|
||||
if post_p := soup.find_all("p"):
|
||||
for p in post_p:
|
||||
t = p.get_text()
|
||||
if not t:
|
||||
continue
|
||||
post_text += f"{replace_br(t)}<br/>\n"
|
||||
else:
|
||||
post_text += replace_br(soup.get_text())
|
||||
return post_text
|
||||
|
||||
|
||||
def parse_tag(tag: Union[Tag, PageElement]) -> str:
|
||||
if tag.name == "a":
|
||||
href = tag.get("href")
|
||||
if href and href.startswith("/"):
|
||||
href = f"https://www.miyoushe.com{href}"
|
||||
if href and href.startswith("http"):
|
||||
return f'<a href="{href}">{tag.get_text()}</a>'
|
||||
elif tag.name == "img":
|
||||
return f"<p>{str(tag)}</p>"
|
||||
elif tag.name == "p":
|
||||
t = tag.get_text()
|
||||
if not t:
|
||||
return ""
|
||||
post_text = []
|
||||
for tag_ in tag.children:
|
||||
if text := parse_tag(tag_):
|
||||
post_text.append(text)
|
||||
return "\n".join(post_text)
|
||||
elif tag.name == "div":
|
||||
post_text = []
|
||||
for tag_ in tag.children:
|
||||
if text := parse_tag(tag_):
|
||||
post_text.append(text)
|
||||
return "\n".join(post_text)
|
||||
text = tag.get_text().strip()
|
||||
if text:
|
||||
return f"<p>{replace_br(text)}</p>"
|
||||
|
||||
|
||||
def parse_content(soup: BeautifulSoup, title: str, video_urls: List[str]) -> str:
|
||||
post_text = f"<h1>{title}</h1>\n"
|
||||
if video_urls:
|
||||
for url in video_urls:
|
||||
post_text += f'<video controls="controls" src="{url}"></video>\n'
|
||||
for tag in soup.find("body").children:
|
||||
if text := parse_tag(tag):
|
||||
post_text += f"{text}\n"
|
||||
return post_text
|
||||
|
||||
|
||||
def parse_stat(stat: PostStat):
|
||||
return (
|
||||
f"<p>"
|
||||
f"💬 {stat.reply_num} "
|
||||
f"🔁 {stat.forward_num} "
|
||||
f"❤️ {stat.like_num} "
|
||||
f"🔖 {stat.bookmark_num} "
|
||||
f"👁️ {stat.view_num} "
|
||||
f"</p><br>\n"
|
||||
)
|
||||
|
||||
|
||||
async def process_article(game_id: str, post_id: int) -> str:
|
||||
path = get_article_cache_file_path(game_id, post_id)
|
||||
if content := await get_article_cache_file(path):
|
||||
return content
|
||||
gids = GAME_ID_MAP.get(game_id)
|
||||
if not gids:
|
||||
raise ArticleNotFoundError(game_id, post_id)
|
||||
hyperion = Hyperion()
|
||||
try:
|
||||
post_info = await hyperion.get_post_info(gids=gids, post_id=post_id)
|
||||
finally:
|
||||
await hyperion.close()
|
||||
post_data = post_info["post"]["post"]
|
||||
post_soup = BeautifulSoup(post_data["content"], features="lxml")
|
||||
author_data = post_info["post"]["user"]
|
||||
content = template.render(
|
||||
url=f"https://www.miyoushe.com/{game_id}/article/{post_id}",
|
||||
description=get_description(post_soup),
|
||||
published_time=datetime.fromtimestamp(post_info.created_at).strftime(
|
||||
"%Y-%m-%dT%H:%M:%S.%fZ"
|
||||
),
|
||||
channel=CHANNEL_MAP.get(game_id, "HSRCN"),
|
||||
article=parse_content(post_soup, post_info.subject, post_info.video_urls),
|
||||
stat=parse_stat(PostStat(**post_info["post"]["stat"])),
|
||||
post=post_data,
|
||||
author=author_data,
|
||||
)
|
||||
await write_article_cache_file(path, content)
|
||||
add_delete_file_job(path)
|
||||
return content
|
8
src/route/__init__.py
Normal file
8
src/route/__init__.py
Normal file
@ -0,0 +1,8 @@
|
||||
def get_routes():
|
||||
from .article import parse_article
|
||||
from .error import validation_exception_handler
|
||||
|
||||
return [
|
||||
parse_article,
|
||||
validation_exception_handler,
|
||||
]
|
23
src/route/article.py
Normal file
23
src/route/article.py
Normal file
@ -0,0 +1,23 @@
|
||||
from starlette.requests import Request
|
||||
from starlette.responses import HTMLResponse
|
||||
|
||||
from .base import get_redirect_response
|
||||
from ..app import app
|
||||
from ..error import ArticleError, ResponseException
|
||||
from ..log import logger
|
||||
from ..render.article import process_article
|
||||
|
||||
|
||||
@app.get("/{game_id}/article/{post_id}")
|
||||
async def parse_article(game_id: str, post_id: int, request: Request):
|
||||
try:
|
||||
return HTMLResponse(await process_article(game_id, post_id))
|
||||
except ResponseException as e:
|
||||
logger.warning(e.message)
|
||||
return get_redirect_response(request)
|
||||
except ArticleError as e:
|
||||
logger.warning(e.msg)
|
||||
return get_redirect_response(request)
|
||||
except Exception as _:
|
||||
logger.exception("Failed to get article.")
|
||||
return get_redirect_response(request)
|
24
src/route/base.py
Normal file
24
src/route/base.py
Normal file
@ -0,0 +1,24 @@
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from starlette.middleware.base import BaseHTTPMiddleware
|
||||
from starlette.responses import RedirectResponse
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from starlette.middleware.base import RequestResponseEndpoint
|
||||
from starlette.requests import Request
|
||||
from starlette.responses import Response
|
||||
|
||||
|
||||
def get_redirect_response(request: "Request") -> RedirectResponse:
|
||||
path = request.url.path
|
||||
return RedirectResponse(url=f"https://www.miyoushe.com{path}", status_code=302)
|
||||
|
||||
|
||||
class UserAgentMiddleware(BaseHTTPMiddleware):
|
||||
async def dispatch(
|
||||
self, request: "Request", call_next: "RequestResponseEndpoint"
|
||||
) -> "Response":
|
||||
user_agent = request.headers.get("User-Agent")
|
||||
if (not user_agent) or ("telegram" not in user_agent.lower()):
|
||||
return get_redirect_response(request)
|
||||
return await call_next(request)
|
11
src/route/error.py
Normal file
11
src/route/error.py
Normal file
@ -0,0 +1,11 @@
|
||||
from fastapi.exceptions import RequestValidationError
|
||||
from starlette.requests import Request
|
||||
|
||||
from src.app import app
|
||||
from src.route.base import get_redirect_response
|
||||
|
||||
|
||||
@app.exception_handler(RequestValidationError)
|
||||
@app.exception_handler(404)
|
||||
async def validation_exception_handler(request: "Request", _):
|
||||
return get_redirect_response(request)
|
0
src/services/__init__.py
Normal file
0
src/services/__init__.py
Normal file
23
src/services/cache.py
Normal file
23
src/services/cache.py
Normal file
@ -0,0 +1,23 @@
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import aiofiles
|
||||
|
||||
cache_dir = Path("cache")
|
||||
cache_dir.mkdir(exist_ok=True)
|
||||
|
||||
|
||||
def get_article_cache_file_path(game_id: str, article_id: int) -> Path:
|
||||
return cache_dir / f"article_{game_id}_{article_id}.html"
|
||||
|
||||
|
||||
async def get_article_cache_file(path: Path) -> Optional[str]:
|
||||
if not path.exists():
|
||||
return None
|
||||
async with aiofiles.open(path, "r", encoding="utf-8") as f:
|
||||
return await f.read()
|
||||
|
||||
|
||||
async def write_article_cache_file(path: Path, content: str) -> None:
|
||||
async with aiofiles.open(path, "w", encoding="utf-8") as f:
|
||||
await f.write(content)
|
28
src/services/scheduler.py
Normal file
28
src/services/scheduler.py
Normal file
@ -0,0 +1,28 @@
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import pytz
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
|
||||
scheduler = AsyncIOScheduler(timezone="Asia/ShangHai")
|
||||
if not scheduler.running:
|
||||
scheduler.start()
|
||||
|
||||
|
||||
async def delete_file(path: Path):
|
||||
path = Path(path)
|
||||
if path.exists():
|
||||
path.unlink(missing_ok=True)
|
||||
|
||||
|
||||
def add_delete_file_job(path: Path, delete_seconds: int = 3600):
|
||||
scheduler.add_job(
|
||||
delete_file,
|
||||
"date",
|
||||
id=f"{hash(path)}|delete_file",
|
||||
name=f"{hash(path)}|delete_file",
|
||||
args=[path],
|
||||
run_date=datetime.datetime.now(pytz.timezone("Asia/Shanghai"))
|
||||
+ datetime.timedelta(seconds=delete_seconds),
|
||||
replace_existing=True,
|
||||
)
|
63
src/templates/article.jinja2
Normal file
63
src/templates/article.jinja2
Normal file
@ -0,0 +1,63 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<!--
|
||||
_______ __ ___ ___ .___ ___. __ ____ ____ ______ __ __ _______. __ __ _______
|
||||
| ____|| | \ \ / / | \/ | | | \ \ / / / __ \ | | | | / || | | | | ____|
|
||||
| |__ | | \ V / | \ / | | | \ \/ / | | | | | | | | | (----`| |__| | | |__
|
||||
| __| | | > < | |\/| | | | \_ _/ | | | | | | | | \ \ | __ | | __|
|
||||
| | | | / . \ | | | | | | | | | `--' | | `--' | .----) | | | | | | |____
|
||||
|__| |__| /__/ \__\ |__| |__| |__| |__| \______/ \______/ |_______/ |__| |__| |_______|
|
||||
|
||||
Embed MiYouShe posts, videos, polls, and more on Telegram
|
||||
-->
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title></title>
|
||||
<link rel="canonical" href="{{ url }}"/>
|
||||
<meta property="theme-color" content="#00a8fc"/>
|
||||
<meta property="twitter:site" content="{{ author.nickname }}"/>
|
||||
<meta property="twitter:creator" content="{{ author.nickname }}"/>
|
||||
<meta property="twitter:title" content="{{ post.subject }} ({{ author.nickname }})"/>
|
||||
<meta property="twitter:image" content="{{ post.cover }}"/>
|
||||
<meta property="twitter:card" content="summary_large_image"/>
|
||||
|
||||
<meta property="og:url" content="{{ url }}"/>
|
||||
<meta property="og:image" content="{{ post.cover }}"/>
|
||||
<meta property="og:title" content="{{ post.subject }} ({{ author.nickname }})"/>
|
||||
<meta property="og:description" content="{{ description }}"/>
|
||||
<meta property="og:site_name" content="{{ post.subject }} - {{ author.nickname }} - 米游社"/>
|
||||
|
||||
<meta property="author" content="{{ author.nickname }}">
|
||||
<!-- Telegram -->
|
||||
<meta property="al:android:app_name" content="Medium"/>
|
||||
<meta property="article:published_time" content="{{ published_time }}"/>
|
||||
<meta name="telegram:channel" content="@{{ channel }}" />
|
||||
</head>
|
||||
<body>
|
||||
<section class="section-backgroundImage">
|
||||
<figure class="graf--layoutFillWidth"></figure>
|
||||
</section>
|
||||
<section class="section--first">
|
||||
If you can see this, your browser is doing something weird with your user agent.
|
||||
<a href="{{ url }}">View original post</a>
|
||||
</section>
|
||||
<article>
|
||||
<!-- article content -->
|
||||
{{ article }}
|
||||
<!-- stat -->
|
||||
{{ stat }}
|
||||
<p><a href="{{ url }}">查看原文</a></p>
|
||||
<!-- author -->
|
||||
<h3>作者信息</h3>
|
||||
{% if author.avatar_url %}
|
||||
<img src="{{ author.avatar_url }}" alt="profile picture"/>
|
||||
{% endif %}
|
||||
<h2>{{ author.nickname }}</h2>
|
||||
<p>
|
||||
<a href="https://www.miyoushe.com/sr/accountCenter/postList?id={{ author.uid }}">@{{ author.nickname }}</a>
|
||||
lv.{{ author.level_exp.level }}
|
||||
</p>
|
||||
<p><a href="{{ url }}">查看原文</a></p>
|
||||
</article>
|
||||
</body>
|
||||
</html>
|
6
src/typedefs.py
Normal file
6
src/typedefs.py
Normal file
@ -0,0 +1,6 @@
|
||||
from typing import Any, Dict
|
||||
|
||||
__all__ = ("POST_DATA", "JSON_DATA")
|
||||
|
||||
POST_DATA = Dict[str, Any]
|
||||
JSON_DATA = Dict[str, Any]
|
Loading…
Reference in New Issue
Block a user