feat: telegram inline bot

This commit is contained in:
omg-xtao 2024-05-10 22:06:34 +08:00 committed by GitHub
parent 4175807801
commit e097a462b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 484 additions and 41 deletions

View File

@ -3,3 +3,10 @@ DOMAIN=127.0.0.1
PORT=8080
MIYOUSHE=true
HOYOLAB=true
BOT=true
BOT_API_ID=XX
BOT_API_HASH=XXX
BOT_TOKEN=XX
MIYOUSHE_HOST=www.miyoushe.pp.ua
HOYOLAB_HOST=www.hoyolab.pp.ua
USER_AGENT=xxx

1
.gitignore vendored
View File

@ -161,3 +161,4 @@ cython_debug/
# cache
cache/
data/

54
main.py
View File

@ -1,8 +1,30 @@
import asyncio
import uvicorn
from src.app import app
from src.env import PORT, MIYOUSHE, HOYOLAB
from signal import signal as signal_fn, SIGINT, SIGTERM, SIGABRT
from src.app import web
from src.bot import bot
from src.env import MIYOUSHE, HOYOLAB, BOT
async def idle():
task = None
def signal_handler(_, __):
if web.web_server_task:
web.web_server_task.cancel()
task.cancel()
for s in (SIGINT, SIGTERM, SIGABRT):
signal_fn(s, signal_handler)
while True:
task = asyncio.create_task(asyncio.sleep(600))
web.bot_main_task = task
try:
await task
except asyncio.CancelledError:
break
async def main():
@ -14,15 +36,23 @@ async def main():
from src.render.article_hoyolab import refresh_hoyo_recommend_posts
await refresh_hoyo_recommend_posts()
web_server = uvicorn.Server(config=uvicorn.Config(app, host="0.0.0.0", port=PORT))
server_config = web_server.config
server_config.setup_event_loop()
if not server_config.loaded:
server_config.load()
web_server.lifespan = server_config.lifespan_class(server_config)
await web_server.startup()
await web_server.main_loop()
await web.start()
if BOT:
await bot.start()
try:
await idle()
finally:
if BOT:
try:
await bot.stop()
except RuntimeError:
pass
if web.web_server:
try:
await web.web_server.shutdown()
except AttributeError:
pass
if __name__ == "__main__":
asyncio.run(main())
asyncio.get_event_loop().run_until_complete(main())

View File

@ -11,3 +11,6 @@ aiofiles==23.2.1
jinja2==3.1.3
beautifulsoup4
lxml
tgcrypto
pyrogram
urlextract

24
src/api/bot_request.py Normal file
View File

@ -0,0 +1,24 @@
from typing import Optional
from httpx import AsyncClient
from src.api.models import PostInfo
from src.env import USER_AGENT
client = AsyncClient(
headers=(
{
"User-Agent": USER_AGENT,
}
if USER_AGENT
else {}
)
)
async def get_post_info(url: str) -> Optional[PostInfo]:
real_url = f"{url}json" if url.endswith("/") else f"{url}/json"
req = await client.get(real_url)
if req.status_code != 200:
return None
return PostInfo(_data={}, **req.json())

View File

@ -54,10 +54,12 @@ class Hoyolab:
async def get_news_bg(self) -> GameBgData:
params = {"with_channel": "1"}
headers = {
'x-rpc-app_version': '2.50.0',
'x-rpc-client_type': '4',
"x-rpc-app_version": "2.50.0",
"x-rpc-client_type": "4",
}
response = await self.client.get(url=self.NEW_BG_URL, params=params, headers=headers)
response = await self.client.get(
url=self.NEW_BG_URL, params=params, headers=headers
)
return GameBgData(**response)
async def close(self):

View File

@ -1,19 +1,61 @@
import asyncio
import uvicorn
from fastapi import FastAPI
from starlette.middleware.trustedhost import TrustedHostMiddleware
from .env import DOMAIN, DEBUG
from .env import DOMAIN, DEBUG, PORT
from .route import get_routes
from .route.base import UserAgentMiddleware
from .services.scheduler import register_scheduler
app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)
class Web:
def __init__(self):
self.web_server = None
self.web_server_task = None
self.bot_main_task = None
@staticmethod
def init_web():
if not DEBUG:
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=[
DOMAIN,
],
)
if not DEBUG:
app.add_middleware(UserAgentMiddleware)
get_routes()
register_scheduler(app)
async def start(self):
self.init_web()
self.web_server = uvicorn.Server(
config=uvicorn.Config(app, host="127.0.0.1", port=PORT)
)
server_config = self.web_server.config
server_config.setup_event_loop()
if not server_config.loaded:
server_config.load()
self.web_server.lifespan = server_config.lifespan_class(server_config)
try:
await self.web_server.startup()
except OSError as e:
raise SystemExit from e
if self.web_server.should_exit:
raise SystemExit from None
self.web_server_task = asyncio.create_task(self.web_server.main_loop())
def stop(self):
if self.web_server_task:
self.web_server_task.cancel()
if self.bot_main_task:
self.bot_main_task.cancel()
web = Web()

17
src/bot.py Normal file
View File

@ -0,0 +1,17 @@
from pyrogram import Client
from pathlib import Path
from .env import BOT_API_ID_INT, BOT_API_HASH, BOT_TOKEN
data_path = Path("data")
data_path.mkdir(exist_ok=True)
bot = Client(
"bot",
api_id=int(BOT_API_ID_INT),
api_hash=BOT_API_HASH,
bot_token=BOT_TOKEN,
workdir="data",
plugins=dict(root="src/plugins"),
)

View File

@ -9,3 +9,15 @@ DOMAIN = os.getenv("DOMAIN", "127.0.0.1")
PORT = int(os.getenv("PORT", 8080))
MIYOUSHE = os.getenv("MIYOUSHE", "True").lower() == "true"
HOYOLAB = os.getenv("HOYOLAB", "True").lower() == "true"
BOT = os.getenv("BOT", "True").lower() == "true"
BOT_API_ID = os.getenv("BOT_API_ID")
BOT_API_ID_INT = 0
try:
BOT_API_ID_INT = int(BOT_API_ID)
except ValueError:
pass
BOT_API_HASH = os.getenv("BOT_API_HASH")
BOT_TOKEN = os.getenv("BOT_TOKEN")
MIYOUSHE_HOST = os.getenv("MIYOUSHE_HOST")
HOYOLAB_HOST = os.getenv("HOYOLAB_HOST")
USER_AGENT = os.getenv("USER_AGENT")

View File

@ -9,6 +9,9 @@ logging_handler.setFormatter(ColoredFormatter(logging_format))
root_logger = logging.getLogger()
root_logger.setLevel(logging.ERROR)
root_logger.addHandler(logging_handler)
pyro_logger = logging.getLogger("pyrogram")
pyro_logger.setLevel(logging.INFO)
pyro_logger.addHandler(logging_handler)
logging.basicConfig(level=logging.INFO)
logs.setLevel(logging.INFO)
logging.getLogger("apscheduler").setLevel(logging.INFO)

0
src/plugins/__init__.py Normal file
View File

100
src/plugins/inline.py Normal file
View File

@ -0,0 +1,100 @@
from typing import List, Optional
from pyrogram.types import (
InputTextMessageContent,
InlineQueryResultArticle,
InlineQuery,
InlineQueryResult,
InlineQueryResultPhoto,
InlineQueryResultDocument,
)
from .start import get_test_button
from ..api.bot_request import get_post_info
from ..api.models import PostInfo
from ..bot import bot
from ..utils.url import get_lab_link
def get_help_article() -> InlineQueryResultArticle:
text = f"欢迎使用 @{bot.me.username} 来转换 米游社/HoYoLab 链接,您也可以将 Bot 添加到群组或频道自动匹配消息。"
return InlineQueryResultArticle(
title=">> 帮助 <<",
description="将 Bot 添加到群组或频道可以自动匹配消息。",
input_message_content=InputTextMessageContent(text),
reply_markup=get_test_button(),
)
def get_article(message: str) -> Optional[InlineQueryResultArticle]:
return InlineQueryResultArticle(
title=">> 转换结果 <<",
description="点击发送文本",
input_message_content=InputTextMessageContent(
message, disable_web_page_preview=False
),
)
def get_notice(message: str) -> InlineQueryResultArticle:
return InlineQueryResultArticle(
title=">> 如果没有正确显示图片和原图,请稍后重试 <<",
description="服务器请求中,请等待...",
input_message_content=InputTextMessageContent(
message, disable_web_page_preview=False
),
)
async def add_document_results(
message: str, post_info: PostInfo
) -> List[InlineQueryResult]:
result = []
text = f"<b>{post_info.subject}</b>\n\n{message}"[:1000]
if post_info.image_urls:
img = post_info.image_urls[0]
result.append(
InlineQueryResultPhoto(
photo_url=img,
title="图片",
description="发送图片",
caption=text,
)
)
result.append(
InlineQueryResultDocument(
document_url=img,
title="原图",
description="发送原图",
caption=text,
)
)
return result
@bot.on_inline_query()
async def inline(_, query: InlineQuery):
message = query.query
results = [get_help_article()]
if message:
replace_list = get_lab_link(message)
if replace_list:
replaced_message = message
for k, v in replace_list.items():
replaced_message = message.replace(k, v)
results.append(get_article(replaced_message))
post_info = None
try:
post_info = await get_post_info(list(replace_list.values())[0])
except Exception:
pass
if post_info:
files = await add_document_results(message, post_info)
if files:
results.append(get_notice(replaced_message))
results += files
await query.answer(
switch_pm_text="🔎 输入 米游社/HoYoLab 链接来转换",
switch_pm_parameter="start",
results=results,
)

71
src/plugins/message.py Normal file
View File

@ -0,0 +1,71 @@
from pyrogram import filters
from pyrogram.enums import MessageEntityType
from pyrogram.errors import WebpageNotFound
from pyrogram.types import Message, MessageEntity
from src.bot import bot
from src.log import logger
from src.utils.url import get_lab_link
async def _need_chat(_, __, m: Message):
return m.chat
async def _need_text(_, __, m: Message):
return m.text or m.caption
async def _forward_from_bot(_, __, m: Message):
return m.forward_from and m.forward_from.is_bot
need_chat = filters.create(_need_chat)
need_text = filters.create(_need_text)
forward_from_bot = filters.create(_forward_from_bot)
@bot.on_message(
filters=filters.incoming
& ~filters.via_bot
& need_text
& need_chat
& ~forward_from_bot,
group=1,
)
async def process_link(_, message: Message):
text = message.text or message.caption
markdown_text = text.markdown
if not markdown_text:
return
if markdown_text.startswith("~"):
return
links = get_lab_link(markdown_text)
if not links:
return
link_text = list(links.values())
logger.info("chat[%s] link_text %s", message.chat.id, link_text)
if not link_text:
return
try:
await message.reply_web_page(
text="",
quote=True,
url=link_text[0],
)
except WebpageNotFound:
text = "." * len(link_text)
entities = [
MessageEntity(
type=MessageEntityType.TEXT_LINK,
offset=idx,
length=idx + 1,
url=i,
)
for idx, i in enumerate(link_text)
]
await message.reply_text(
text=text,
quote=True,
entities=entities,
)

30
src/plugins/start.py Normal file
View File

@ -0,0 +1,30 @@
from pyrogram import filters
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from src.bot import bot
HELP_MSG = "此 BOT 将会自动回复可以转换成 Telegram 预览的 URL 链接,可以提供更直观、方便的浏览体验。"
TEST_URL = "https://m.miyoushe.com/ys?channel=xiaomi/#/article/51867765"
def get_test_button() -> InlineKeyboardMarkup:
return InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
text="🍰 尝试一下",
switch_inline_query=TEST_URL,
),
]
]
)
@bot.on_message(filters=filters.command("start"))
async def start(_, message):
await message.reply_text(
HELP_MSG,
quote=True,
reply_markup=get_test_button(),
)

View File

@ -178,12 +178,16 @@ async def process_article_image(
)
async def process_article(game_id: str, post_id: int, i18n: I18n = I18n()) -> str:
async def get_post_info(game_id: str, post_id: int):
gids = GAME_ID_MAP.get(game_id)
if not gids:
raise ArticleNotFoundError(game_id, post_id)
async with Hyperion() as hyperion:
post_info = await hyperion.get_post_info(gids=gids, post_id=post_id)
return await hyperion.get_post_info(gids=gids, post_id=post_id)
async def process_article(game_id: str, post_id: int, i18n: I18n = I18n()) -> str:
post_info = await get_post_info(game_id, post_id)
if post_info.view_type in [PostType.TEXT, PostType.VIDEO]:
content = await process_article_text(post_info, get_recommend_post, i18n)
elif post_info.view_type == PostType.IMAGE:
@ -199,9 +203,9 @@ if MIYOUSHE:
async with Hyperion() as hyperion:
for key, gids in GAME_ID_MAP.items():
try:
RECOMMEND_POST_MAP[
key
] = await hyperion.get_official_recommended_posts(gids)
RECOMMEND_POST_MAP[key] = (
await hyperion.get_official_recommended_posts(gids)
)
except Exception as _:
logger.exception(f"Failed to get recommend posts gids={gids}")
logger.info("Finish to refresh recommend posts")

View File

@ -23,11 +23,11 @@ def get_recommend_post(post_info: PostInfo, i18n: I18n) -> List[PostRecommend]:
return [
PostRecommend(
post_id=post.post_id,
subject=post.multi_language_info.lang_subject.get(
i18n.lang.value, post.subject
)
subject=(
post.multi_language_info.lang_subject.get(i18n.lang.value, post.subject)
if post.multi_language_info
else post.subject,
else post.subject
),
)
for post in posts
if post.post_id != post_info.post_id
@ -53,6 +53,11 @@ async def process_article_video(
)
async def get_post_info(post_id: int, lang: str):
async with Hoyolab() as hoyolab:
return await hoyolab.get_post_info(post_id=post_id, lang=lang)
async def process_article(post_id: int, lang: str) -> str:
try:
i18n = I18n(i18n_alias.get(lang))
@ -60,8 +65,7 @@ async def process_article(post_id: int, lang: str) -> str:
i18n = I18n(lang)
except ValueError:
i18n = I18n()
async with Hoyolab() as hoyolab:
post_info = await hoyolab.get_post_info(post_id=post_id, lang=i18n.lang.value)
post_info = await get_post_info(post_id, i18n.lang.value)
if post_info.view_type == PostType.TEXT:
content = await process_article_text(post_info, get_recommend_post, i18n)
elif post_info.view_type == PostType.IMAGE:

View File

@ -5,7 +5,7 @@ from .base import get_redirect_response
from ..app import app
from ..error import ArticleError, ResponseException
from ..log import logger
from ..render.article import process_article
from ..render.article import process_article, get_post_info
@app.get("/{game_id}/article/{post_id}")
@ -19,5 +19,21 @@ async def parse_article(game_id: str, post_id: int, request: Request):
logger.warning(e.msg)
return get_redirect_response(request)
except Exception as _:
logger.exception(f"Failed to get article {game_id} {post_id}")
logger.exception(
"Failed to get article game_id[%s] post_id[%s]", game_id, post_id
)
return get_redirect_response(request)
@app.get("/{game_id}/article/{post_id}/json")
async def parse_article_json(game_id: str, post_id: int, request: Request):
try:
return await get_post_info(game_id, post_id)
except ArticleError as e:
logger.warning(e.msg)
return get_redirect_response(request)
except Exception as _:
logger.exception(
"Failed to get article game_id[%s] post_id[%s]", game_id, post_id
)
return get_redirect_response(request)

View File

@ -5,13 +5,15 @@ from .base import get_redirect_response
from ..app import app
from ..error import ArticleError, ResponseException
from ..log import logger
from ..render.article_hoyolab import process_article
from ..render.article_hoyolab import process_article, get_post_info
@app.get("/article/{post_id}")
@app.get("/article/{post_id}/{lang}")
async def parse_hoyo_article(post_id: int, request: Request, lang: str = "zh-cn"):
try:
if lang == "json":
return await get_post_info(post_id, "zh-cn")
return HTMLResponse(await process_article(post_id, lang))
except ResponseException as e:
logger.warning(e.message)

0
src/utils/__init__.py Normal file
View File

42
src/utils/url.py Normal file
View File

@ -0,0 +1,42 @@
from typing import Dict
from urlextract import URLExtract
from httpx import URL
from src.env import HOYOLAB_HOST, MIYOUSHE_HOST
extractor = URLExtract()
def parse_link(url: URL) -> URL:
host = HOYOLAB_HOST
if "miyoushe" in url.host:
host = MIYOUSHE_HOST
if url.fragment:
path = ""
if url.path != "/":
path = url.path
new_url = URL(f"https://a{path}{url.fragment}")
return url.copy_with(host=host, path=new_url.path, fragment=None, query=None)
return url.copy_with(host=host)
def get_lab_link(url: str) -> Dict[str, str]:
data = {}
for old_url in extractor.find_urls(url):
u = URL(old_url)
if u.scheme not in ["http", "https"]:
continue
if u.host not in [
"www.miyoushe.com",
"m.miyoushe.com",
"www.hoyolab.com",
"m.hoyolab.com",
]:
continue
parsed_link = str(parse_link(u))
if "article" in parsed_link:
data[old_url] = parsed_link
return data

33
tests/test_url.py Normal file
View File

@ -0,0 +1,33 @@
import pytest
from httpx import URL
from src.utils.url import parse_link
@pytest.mark.asyncio
class TestUrl:
@staticmethod
async def test_hoyolab_desktop():
url = URL("https://www.hoyolab.com/article/25091304")
real = parse_link(url)
assert real == URL("https://www.hoyolab.pp.ua/article/25091304")
@staticmethod
async def test_hoyolab_android():
url = URL(
"https://m.hoyolab.com/#/article/25091304?utm_source=sns&utm_medium=twitter&utm_id=2"
)
real = parse_link(url)
assert real == URL("https://www.hoyolab.pp.ua/article/25091304")
@staticmethod
async def test_miyoushe_desktop():
url = URL("https://www.miyoushe.com/sr/article/43966902")
real = parse_link(url)
assert real == URL("https://www.miyoushe.pp.ua/sr/article/43966902")
@staticmethod
async def test_miyoushe_android():
url = URL("https://m.miyoushe.com/sr?channel=beta/#/article/43966902")
real = parse_link(url)
assert real == URL("https://www.miyoushe.pp.ua/sr/article/43966902")