post 插件添加定时推送功能

This commit is contained in:
洛水居室 2022-10-13 15:38:47 +08:00 committed by GitHub
parent 3fe62c0100
commit b122d840f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 138 additions and 16 deletions

View File

@ -16,19 +16,19 @@ REDIS_DB=0
# 联系 https://t.me/BotFather 使用 /newbot 命令创建机器人并获取 token
BOT_TOKEN="xxxxxxx"
# 记录错误并发送消息通知开发人员 可选配置项
ERROR_NOTIFICATION_CHAT_ID=chat_id
# 文章推送群组 可选配置项
CHANNELS=[{ "name": "", "chat_id": 1}]
# bot 管理员
ADMINS=[{ "username": "", "user_id": -1 }]
# 群验证功能 可选配置项
VERIFY_GROUPS=[]
# 记录错误并发送消息通知开发人员 可选配置项
# ERROR_NOTIFICATION_CHAT_ID=chat_id
# logger 配置
# 文章推送群组 可选配置项
# CHANNELS=[{ "name": "", "chat_id": 1}]
# 群验证功能 可选配置项
# VERIFY_GROUPS=[]
# logger 配置 可选配置项
LOGGER_WIDTH=180
LOGGER_LOG_PATH="logs"
LOGGER_TIME_FORMAT="[%Y-%m-%d %X]"
@ -36,11 +36,11 @@ LOGGER_TRACEBACK_MAX_FRAMES=20
LOGGER_RENDER_KEYWORDS=["BOT"]
# mtp 客户端 可选配置项
API_ID=12345
API_HASH="abcdefg"
# API_ID=12345
# API_HASH="abcdefg"
# ENKA_NETWORK_API 可选配置项
ENKA_NETWORK_API_AGENT=""
# ENKA_NETWORK_API_AGENT=""
# Web Server
# 目前只用于预览模板,仅开发环境启动

View File

@ -5,8 +5,8 @@ from json import JSONDecodeError
from typing import List, Optional, Dict
from genshin import Client, InvalidCookies
from genshin.utility.uid import recognize_genshin_server
from genshin.utility.ds import generate_dynamic_secret
from genshin.utility.uid import recognize_genshin_server
from httpx import AsyncClient
from modules.apihelper.base import ArtworkImage, PostInfo
@ -25,6 +25,8 @@ class Hyperion:
POST_FULL_URL = "https://bbs-api.mihoyo.com/post/wapi/getPostFull"
POST_FULL_IN_COLLECTION_URL = "https://bbs-api.mihoyo.com/post/wapi/getPostFullInCollection"
GET_NEW_LIST_URL = "https://bbs-api.mihoyo.com/post/wapi/getNewsList"
GET_OFFICIAL_RECOMMENDED_POSTS_URL = "https://bbs-api.mihoyo.com/post/wapi/getOfficialRecommendedPosts"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/90.0.4430.72 Safari/537.36"
@ -89,6 +91,11 @@ class Hyperion:
)
return {"x-oss-process": params}
async def get_official_recommended_posts(self, gids: int) -> JSONDict:
params = {"gids": gids}
response = await self.client.get(url=self.GET_OFFICIAL_RECOMMENDED_POSTS_URL, params=params)
return response
async def get_post_full_in_collection(self, collection_id: int, gids: int = 2, order_type=1) -> JSONDict:
params = {"collection_id": collection_id, "gids": gids, "order_type": order_type}
response = await self.client.get(url=self.POST_FULL_IN_COLLECTION_URL, params=params)

View File

@ -1,7 +1,15 @@
from typing import Optional, List
from typing import Optional, List, Tuple
from bs4 import BeautifulSoup
from telegram import Update, ReplyKeyboardMarkup, ReplyKeyboardRemove, InputMediaPhoto
from telegram import (
Update,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
InputMediaPhoto,
InlineKeyboardButton,
InlineKeyboardMarkup,
Message,
)
from telegram.constants import ParseMode, MessageLimit
from telegram.error import BadRequest
from telegram.ext import CallbackContext, ConversationHandler, filters
@ -9,8 +17,10 @@ from telegram.helpers import escape_markdown
from core.baseplugin import BasePlugin
from core.bot import bot
from core.config import config
from core.plugin import Plugin, conversation, handler
from modules.apihelper.base import ArtworkImage
from modules.apihelper.error import APIHelperException
from modules.apihelper.hyperion import Hyperion
from utils.decorators.admins import bot_admins_rights_check
from utils.decorators.error import error_callable
@ -38,6 +48,95 @@ class Post(Plugin.Conversation, BasePlugin.Conversation):
def __init__(self):
self.bbs = Hyperion()
self.last_post_id_list: List[int] = []
if config.channels is not None and len(config.channels) > 0:
logger.success("文章定时推送处理已经开启")
bot.app.job_queue.run_repeating(self.task, 60 * 3)
async def task(self, context: CallbackContext):
temp_post_id_list: List[int] = []
# 请求推荐POST列表并处理
try:
official_recommended_posts = await self.bbs.get_official_recommended_posts(2)
except APIHelperException as exc:
logger.error(f"获取首页推荐信息失败 {repr(exc)}")
return
for data_list in official_recommended_posts["list"]:
temp_post_id_list.append(data_list["post_id"])
# 判断是否为空
if len(self.last_post_id_list) == 0:
for temp_list in temp_post_id_list:
self.last_post_id_list.append(temp_list)
return
# 筛选出新推送的文章
new_post_id_list = set(temp_post_id_list).difference(set(self.last_post_id_list))
if len(new_post_id_list) == 0:
return
self.last_post_id_list = temp_post_id_list
for post_id in temp_post_id_list:
try:
post_info = await self.bbs.get_post_info(2, post_id)
except APIHelperException as exc:
logger.error(f"获取文章信息失败 {repr(exc)}")
text = f"获取 post_id[{post_id}] 文章信息失败 {repr(exc)}"
for user in config.admins:
try:
await context.bot.send_message(user.user_id, text)
except BadRequest as _exc:
logger.error(f"发送消息失败 {repr(_exc)}")
return
buttons = [
[
InlineKeyboardButton("确认", callback_data=f"post_admin|confirm|{post_info.post_id}"),
InlineKeyboardButton("取消", callback_data=f"post_admin|cancel|{post_info.post_id}"),
]
]
url = f"https://bbs.mihoyo.com/ys/article/{post_info.post_id}"
text = f"发现官网推荐文章 <a href='{url}'>{post_info.subject}</a>\n是否开始处理"
for user in config.admins:
try:
await context.bot.send_message(user.user_id, text, reply_markup=InlineKeyboardMarkup(buttons))
except BadRequest as exc:
logger.error(f"发送消息失败 {repr(exc)}")
@conversation.entry_point
@handler.callback_query(pattern=r"^post_admin\|", block=False)
@bot_admins_rights_check
@error_callable
async def callback_query_start(self, update: Update, context: CallbackContext) -> int:
post_handler_data: PostHandlerData = context.chat_data.get("post_handler_data")
callback_query = update.callback_query
user = callback_query.from_user
message = callback_query.message
logger.info(f"用户 {user.full_name}[{user.id}] POST命令请求")
async def get_post_admin_callback(callback_query_data: str) -> Tuple[str, int]:
_data = callback_query_data.split("|")
_result = _data[1]
_post_id = int(_data[2])
logger.debug(f"callback_query_data函数返回 result[{_result}] post_id[{_post_id}]")
return _result, _post_id
result, post_id = await get_post_admin_callback(callback_query.data)
if result == "cancel":
await message.reply_text("操作已经取消")
await message.delete()
elif result == "confirm":
reply_text = await message.reply_text("正在处理")
status = await self.send_post_info(post_handler_data, message, post_id)
await reply_text.delete()
return status
await message.reply_text("非法参数")
return ConversationHandler.END
@conversation.entry_point
@handler.command(command="post", filters=filters.ChatType.PRIVATE, block=True)
@ -71,6 +170,9 @@ class Post(Plugin.Conversation, BasePlugin.Conversation):
if post_id == -1:
await message.reply_text("获取作品ID错误请检查连接是否合法", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
return await self.send_post_info(post_handler_data, message, post_id)
async def send_post_info(self, post_handler_data: PostHandlerData, message: Message, post_id: int) -> int:
post_info = await self.bbs.get_post_info(2, post_id)
post_images = await self.bbs.get_images_by_post_id(2, post_id)
post_data = post_info["post"]["post"]

View File

@ -43,3 +43,16 @@ async def test_get_post_info(hyperion):
async def test_get_images_by_post_id(hyperion):
post_images = await hyperion.get_images_by_post_id(2, 29023709)
assert len(post_images) == 1
# noinspection PyShadowingNames
@pytest.mark.asyncio
@flaky(3, 1)
async def test_official_recommended_posts(hyperion):
official_recommended_posts = await hyperion.get_official_recommended_posts(2)
assert len(official_recommended_posts["list"]) > 0
for data_list in official_recommended_posts["list"]:
post_info = await hyperion.get_post_info(2, data_list["post_id"])
assert post_info.post_id
assert post_info.subject
LOGGER.info("official_recommended_posts: post_id[%s] subject[%s]", post_info.post_id, post_info.subject)