From 1051b0b48645d4422e98bcc686ab2bd163443a2c Mon Sep 17 00:00:00 2001
From: omg-xtao <100690902+omg-xtao@users.noreply.github.com>
Date: Sun, 3 Mar 2024 17:23:17 +0800
Subject: [PATCH] :sparkles: Support code post to channel
---
.../apihelper/client/components/hyperion.py | 44 ++-
modules/apihelper/models/genshin/hyperion.py | 33 +-
plugins/admin/post_code.py | 335 ++++++++++++++++++
3 files changed, 410 insertions(+), 2 deletions(-)
create mode 100644 plugins/admin/post_code.py
diff --git a/modules/apihelper/client/components/hyperion.py b/modules/apihelper/client/components/hyperion.py
index 33a8005..4fc3d50 100644
--- a/modules/apihelper/client/components/hyperion.py
+++ b/modules/apihelper/client/components/hyperion.py
@@ -1,10 +1,11 @@
import asyncio
import os
import re
+from time import time
from typing import List
from ..base.hyperionrequest import HyperionRequest
-from ...models.genshin.hyperion import PostInfo, ArtworkImage
+from ...models.genshin.hyperion import PostInfo, ArtworkImage, LiveInfo, LiveCode, LiveCodeHoYo
from ...typedefs import JSON_DATA
__all__ = ("Hyperion",)
@@ -20,6 +21,9 @@ class Hyperion:
POST_FULL_IN_COLLECTION_URL = "https://bbs-api.miyoushe.com/post/wapi/getPostFullInCollection"
GET_NEW_LIST_URL = "https://bbs-api.miyoushe.com/post/wapi/getNewsList"
GET_OFFICIAL_RECOMMENDED_POSTS_URL = "https://bbs-api.miyoushe.com/post/wapi/getOfficialRecommendedPosts"
+ LIVE_INFO_URL = "https://api-takumi.mihoyo.com/event/miyolive/index"
+ LIVE_CODE_URL = "https://api-takumi-static.mihoyo.com/event/miyolive/refreshCode"
+ LIVE_CODE_HOYO_URL = "https://bbs-api-os.hoyolab.com/community/painter/wapi/circle/channel/guide/material"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
@@ -139,5 +143,43 @@ class Hyperion:
response = await self.client.get(url=self.GET_NEW_LIST_URL, params=params)
return response
+ async def get_live_info(self, act_id: str) -> LiveInfo:
+ headers = {"x-rpc-act_id": act_id}
+ response = await self.client.get(url=self.LIVE_INFO_URL, headers=headers)
+ return LiveInfo(**response["live"])
+
+ async def get_live_code(self, act_id: str, ver_code: str) -> List[LiveCode]:
+ headers = {"x-rpc-act_id": act_id}
+ params = {
+ "version": ver_code,
+ "time": str(int(time())),
+ }
+ response = await self.client.get(url=self.LIVE_CODE_URL, headers=headers, params=params)
+ codes = []
+ for code_data in response.get("code_list", []):
+ codes.append(LiveCode(**code_data))
+ return codes
+
+ async def get_live_code_hoyo(self, gid: int) -> List[LiveCodeHoYo]:
+ headers = self.get_headers("https://www.hoyolab.com/")
+ headers.update(
+ {
+ "x-rpc-app_version": "2.50.0",
+ "x-rpc-client_type": "4",
+ "x-rpc-language": "zh-cn",
+ }
+ )
+ params = {
+ "game_id": str(gid),
+ }
+ codes = []
+ response = await self.client.get(url=self.LIVE_CODE_HOYO_URL, headers=headers, params=params)
+ for module in response.get("modules", []):
+ if exchange_group := module.get("exchange_group"):
+ for code_data in exchange_group.get("bonuses", []):
+ codes.append(LiveCodeHoYo(**code_data))
+ break
+ return codes
+
async def close(self):
await self.client.shutdown()
diff --git a/modules/apihelper/models/genshin/hyperion.py b/modules/apihelper/models/genshin/hyperion.py
index 93296a2..eefa487 100644
--- a/modules/apihelper/models/genshin/hyperion.py
+++ b/modules/apihelper/models/genshin/hyperion.py
@@ -1,10 +1,11 @@
+from datetime import datetime
from io import BytesIO
from typing import Any, List, Optional
from PIL import Image, UnidentifiedImageError
from pydantic import BaseModel, PrivateAttr
-__all__ = ("ArtworkImage", "PostInfo")
+__all__ = ("ArtworkImage", "PostInfo", "LiveInfo", "LiveCode", "LiveCodeHoYo")
class ArtworkImage(BaseModel):
@@ -90,3 +91,33 @@ class PostInfo(BaseModel):
def __getitem__(self, item):
return self._data[item]
+
+
+class LiveInfo(BaseModel):
+ act_type: str
+ title: str
+ live_time: str
+ start: datetime
+ end: datetime
+ remain: int
+ now: datetime
+ is_end: bool
+ code_ver: str
+
+
+class LiveCode(BaseModel):
+ code: str
+ to_get_time: datetime
+
+ @property
+ def text(self) -> str:
+ return self.code if self.code else "XXXXXXXXXXXX"
+
+
+class LiveCodeHoYo(BaseModel):
+ exchange_code: str
+ offline_at: datetime
+
+ @property
+ def text(self) -> str:
+ return self.exchange_code if self.exchange_code else "XXXXXXXXXXXX"
diff --git a/plugins/admin/post_code.py b/plugins/admin/post_code.py
new file mode 100644
index 0000000..0ac553b
--- /dev/null
+++ b/plugins/admin/post_code.py
@@ -0,0 +1,335 @@
+import json
+import re
+from datetime import datetime, timezone, timedelta
+from typing import List, Optional, Tuple, TYPE_CHECKING, Dict
+
+from httpx import Timeout
+from telegram import (
+ ReplyKeyboardMarkup,
+ ReplyKeyboardRemove,
+)
+from telegram.constants import ParseMode
+from telegram.error import BadRequest, Forbidden
+from telegram.ext import ConversationHandler, filters
+
+from core.config import config
+from core.plugin import Plugin, conversation, handler
+from modules.apihelper.client.components.hyperion import Hyperion
+from modules.apihelper.error import APIHelperException
+from utils.log import logger
+
+if TYPE_CHECKING:
+ from telegram import Update, Message
+ from telegram.ext import ContextTypes, Job
+ from modules.apihelper.models.genshin.hyperion import LiveCode, LiveCodeHoYo
+
+
+class PostCodeHandlerData:
+ def __init__(self):
+ self.version: str = ""
+ self.act_id = ""
+ self.ver_code = ""
+ self.mys_code: List["LiveCode"] = []
+ self.hoyo_code: List["LiveCodeHoYo"] = []
+ self.channel_username: str = ""
+ self.channel_id: int = -1
+ self.channel_msg: Optional["Message"] = None
+ self.need_update: bool = False
+ self.end_task_time: datetime = self.utc_8(datetime.now() + timedelta(hours=1))
+
+ def need_end_task(self) -> bool:
+ if not self.real_need_update():
+ return True
+ now = self.utc_8(datetime.now())
+ out_of_time = now >= self.end_task_time
+ if out_of_time:
+ logger.warning("PostCode 定时任务超过最大允许时间,结束任务")
+ return out_of_time
+
+ @staticmethod
+ def utc_8(time: datetime) -> datetime:
+ return time.astimezone(timezone(timedelta(hours=8)))
+
+ def get_end_time(self) -> str:
+ for code in self.hoyo_code:
+ if code.offline_at:
+ utc_8 = self.utc_8(code.offline_at)
+ return utc_8.strftime("%Y-%m-%d %H:%M:%S")
+ return "未知时间"
+
+ def get_guess_last_time(self) -> datetime:
+ return self.utc_8(self.mys_code[-1].to_get_time)
+
+ def get_need_update_text(self):
+ time = []
+ for code in self.mys_code:
+ time.append(self.utc_8(code.to_get_time).strftime("%H:%M:%S"))
+ return UPDATE_TEMPLATE % tuple(time)
+
+ def real_need_update(self) -> bool:
+ return not all([i.code for i in self.mys_code] + [i.exchange_code for i in self.hoyo_code])
+
+ def get_code_text(self) -> List[str]:
+ return [code.text for code in self.mys_code] + [code.text for code in self.hoyo_code]
+
+ def have_changes(self, mys_code: List["LiveCode"], hoyo_code: List["LiveCodeHoYo"]) -> bool:
+ if len(mys_code) != len(self.mys_code) or len(hoyo_code) != len(self.hoyo_code):
+ return True
+ for i, code in enumerate(mys_code):
+ if code.code != self.mys_code[i].code:
+ return True
+ for i, code in enumerate(hoyo_code):
+ if code.exchange_code != self.hoyo_code[i].exchange_code:
+ return True
+ return False
+
+ def get_text(self) -> str:
+ return POST_TEMPLATE % (
+ self.version,
+ *self.get_code_text(),
+ self.get_end_time(),
+ )
+
+
+SEND_POST, CHECK_COMMAND, GET_POST_CHANNEL = range(10900, 10903)
+POST_TEMPLATE = """《原神》%s 版本前瞻特别节目兑换码
+
+国服:
+%s
- 原石 ×100,精锻用魔矿 x10
+%s
- 原石 ×100,大英雄的经验 x5
+%s
- 原石 ×100,摩拉 x50000
+
+国际服:
+%s
- 原石 ×100,精锻用魔矿 x10
+%s
- 原石 ×100,大英雄的经验 x5
+%s
- 原石 ×100,摩拉 x50000
+
+兑换码过期时间 %s UTC+8,请尽快领取。"""
+UPDATE_TEMPLATE = """可能的兑换码发放时间:
+
+%s、%s、%s
+
+更新可能延迟三到五分钟,请耐心等待。"""
+
+
+class PostCode(Plugin.Conversation):
+ """版本前瞻特别节目兑换码推送"""
+
+ MENU_KEYBOARD = ReplyKeyboardMarkup([["推送频道", "推送并且定时更新"], ["退出"]], True, True)
+ SUBJECT_RE = re.compile(r"一起来看《原神》(\d+\.\d+)版本前瞻特别节目吧!")
+ ACT_RE = re.compile(r"act_id=(.*?)&")
+
+ def __init__(self):
+ self.gids = 2
+ self.type_id = 3
+
+ @staticmethod
+ def get_bbs_client() -> Hyperion:
+ return Hyperion(
+ timeout=Timeout(
+ connect=config.connect_timeout,
+ read=config.read_timeout,
+ write=config.write_timeout,
+ pool=config.pool_timeout,
+ ),
+ )
+
+ def init_version(self, news: List[Dict]) -> Tuple[Optional[str], Optional[Dict]]:
+ for new in news:
+ post = new.get("post", {})
+ if not post:
+ continue
+ if not (subject := post.get("subject")):
+ continue
+ if not (match := self.SUBJECT_RE.match(subject)):
+ continue
+ return match.group(1), post
+ return None, None
+
+ def init_act_id(self, post: Dict) -> Optional[str]:
+ structured_content = post.get("structured_content")
+ if not structured_content:
+ return None
+ structured_data = json.loads(structured_content)
+ for item in structured_data:
+ if not (attributes := item.get("attributes")):
+ continue
+ if not (link := attributes.get("link")):
+ continue
+ if not (match := self.ACT_RE.search(link)):
+ continue
+ return match.group(1)
+ return None
+
+ async def init(self, post_code_handler_data: PostCodeHandlerData) -> bool:
+ """解析 act_id ver_code 以及目标游戏版本"""
+ client = self.get_bbs_client()
+ try:
+ news = await client.get_new_list(self.gids, self.type_id)
+ version, final_post = self.init_version(news.get("list", []))
+ if not final_post:
+ raise ValueError("未找到版本前瞻特别节目文章")
+ act_id = self.init_act_id(final_post)
+ if not act_id:
+ raise ValueError("未找到文章中的 act_id")
+ live_info = await client.get_live_info(act_id)
+ ver_code = live_info.code_ver
+ post_code_handler_data.version = version
+ post_code_handler_data.act_id = act_id
+ post_code_handler_data.ver_code = ver_code
+ post_code_handler_data.mys_code = await client.get_live_code(act_id, ver_code)
+ post_code_handler_data.hoyo_code = await client.get_live_code_hoyo(self.gids)
+ if len(post_code_handler_data.mys_code) != 3 or len(post_code_handler_data.hoyo_code) != 3:
+ raise ValueError("获取兑换码数据成功,但是数量不对")
+ return True
+ finally:
+ await client.close()
+
+ @conversation.entry_point
+ @handler.command(command="post_code", filters=filters.ChatType.PRIVATE, block=False, admin=True)
+ async def command_start(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> int:
+ user = update.effective_user
+ message = update.effective_message
+ logger.info("用户 %s[%s] POST_CODE 命令请求", user.full_name, user.id)
+ post_code_handler_data = PostCodeHandlerData()
+ context.chat_data["post_code_handler_data"] = post_code_handler_data
+ text = f"✿✿ヽ(°▽°)ノ✿ 你好! {user.username} ,正在尝试自动获取必要的信息,请耐心等待。。。"
+ reply = await message.reply_text(text)
+ try:
+ result = await self.init(post_code_handler_data)
+ if not result:
+ await reply.edit_text("初始化基础信息失败,请检查是否有直播正在进行")
+ return ConversationHandler.END
+ await reply.delete()
+ await message.reply_text(post_code_handler_data.get_text(), parse_mode=ParseMode.HTML)
+ await message.reply_text("初始化信息完成,请选择你的操作", reply_markup=self.MENU_KEYBOARD)
+ return CHECK_COMMAND
+ except (APIHelperException, ValueError) as exc:
+ await reply.edit_text(f"初始化基础信息失败,错误信息:{str(exc)}")
+ return ConversationHandler.END
+
+ @conversation.state(state=CHECK_COMMAND)
+ @handler.message(filters=filters.TEXT & ~filters.COMMAND, block=False)
+ async def check_command(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> int:
+ post_code_handler_data: PostCodeHandlerData = context.chat_data.get("post_code_handler_data")
+ message = update.effective_message
+ if message.text == "退出":
+ await message.reply_text("退出任务", reply_markup=ReplyKeyboardRemove())
+ return ConversationHandler.END
+ if message.text == "推送频道":
+ return await self.get_channel(update, context)
+ if message.text == "推送并且定时更新":
+ if not post_code_handler_data.real_need_update():
+ await message.reply_text("所有兑换码已发放,无需创建更新任务,将直接推送。", reply_markup=ReplyKeyboardRemove())
+ return await self.get_channel(update, context)
+ post_code_handler_data.need_update = True
+ await message.reply_text(post_code_handler_data.get_need_update_text())
+ return await self.get_channel(update, context)
+ return ConversationHandler.END
+
+ async def get_channel(self, update: "Update", _: "ContextTypes.DEFAULT_TYPE") -> int:
+ message = update.effective_message
+ reply_keyboard = []
+ try:
+ for channel_id in config.channels:
+ chat = await self.get_chat(chat_id=channel_id)
+ reply_keyboard.append([f"{chat.username}"])
+ except KeyError as error:
+ logger.error("从配置文件获取频道信息发生错误,退出任务", exc_info=error)
+ await message.reply_text("从配置文件获取频道信息发生错误,退出任务", reply_markup=ReplyKeyboardRemove())
+ return ConversationHandler.END
+ await message.reply_text("请选择你要推送的频道", reply_markup=ReplyKeyboardMarkup(reply_keyboard, True, True))
+ return GET_POST_CHANNEL
+
+ @conversation.state(state=GET_POST_CHANNEL)
+ @handler.message(filters=filters.TEXT & ~filters.COMMAND, block=False)
+ async def get_post_channel(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> int:
+ post_code_handler_data: PostCodeHandlerData = context.chat_data.get("post_code_handler_data")
+ message = update.effective_message
+ channel_id = -1
+ try:
+ for channel_chat_id in config.channels:
+ chat = await self.get_chat(chat_id=channel_chat_id)
+ if message.text == chat.username:
+ channel_id = channel_chat_id
+ except KeyError as exc:
+ logger.error("从配置文件获取频道信息发生错误,退出任务", exc_info=exc)
+ logger.exception(exc)
+ await message.reply_text("从配置文件获取频道信息发生错误,退出任务", reply_markup=ReplyKeyboardRemove())
+ return ConversationHandler.END
+ if channel_id == -1:
+ await message.reply_text("获取频道信息失败,请检查你输入的内容是否正确", reply_markup=ReplyKeyboardRemove())
+ return ConversationHandler.END
+ post_code_handler_data.channel_username = message.text
+ post_code_handler_data.channel_id = channel_id
+ reply_keyboard = [["确认", "退出"]]
+ await message.reply_text("请核对你修改的信息", reply_markup=ReplyKeyboardMarkup(reply_keyboard, True, True))
+ return SEND_POST
+
+ @conversation.state(state=SEND_POST)
+ @handler.message(filters=filters.TEXT & ~filters.COMMAND, block=False)
+ async def send_post(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> int:
+ post_code_handler_data: PostCodeHandlerData = context.chat_data.get("post_code_handler_data")
+ message = update.effective_message
+ if message.text == "退出":
+ await message.reply_text(text="退出任务", reply_markup=ReplyKeyboardRemove())
+ return ConversationHandler.END
+ await message.reply_text("正在推送", reply_markup=ReplyKeyboardRemove())
+ channel_id, channel_username = post_code_handler_data.channel_id, post_code_handler_data.channel_username
+ post_text = post_code_handler_data.get_text()
+ post_text += f"\n\n@{channel_username}"
+ try:
+ msg = await context.bot.send_message(channel_id, post_text, parse_mode=ParseMode.HTML)
+ if post_code_handler_data.need_update:
+ post_code_handler_data.channel_msg = msg
+ end_time = post_code_handler_data.get_guess_last_time() + timedelta(minutes=10)
+ post_code_handler_data.end_task_time = end_time
+ self.create_task(post_code_handler_data)
+ except BadRequest as exc:
+ await message.reply_text(f"发送消息时发生错误 {exc.message}", reply_markup=ReplyKeyboardRemove())
+ logger.error("PostCode 模块发送消息时发生错误 %s", exc.message)
+ return ConversationHandler.END
+ await message.reply_text("推送成功", reply_markup=ReplyKeyboardRemove())
+ return ConversationHandler.END
+
+ def create_task(self, data: "PostCodeHandlerData"):
+ logger.debug("创建 PostCode 定时任务")
+ self.application.job_queue.run_once(self.post_code_task, 60, data=data)
+
+ async def post_code_task(self, context: "ContextTypes.DEFAULT_TYPE"):
+ client = self.get_bbs_client()
+ job: "Job" = context.job
+ if not isinstance(job.data, PostCodeHandlerData):
+ return
+ post_code_handler_data: "PostCodeHandlerData" = job.data
+ if not post_code_handler_data.channel_msg:
+ return
+ act_id = post_code_handler_data.act_id
+ ver_code = post_code_handler_data.ver_code
+ channel_username = post_code_handler_data.channel_username
+ try:
+ mys_code = await client.get_live_code(act_id, ver_code)
+ hoyo_code = await client.get_live_code_hoyo(self.gids)
+ if len(post_code_handler_data.mys_code) != 3 or len(post_code_handler_data.hoyo_code) != 3:
+ raise ValueError("获取兑换码数据成功,但是数量不对")
+ if post_code_handler_data.have_changes(mys_code, hoyo_code):
+ post_code_handler_data.mys_code = mys_code
+ post_code_handler_data.hoyo_code = hoyo_code
+ post_text = post_code_handler_data.get_text()
+ post_text += f"\n\n@{channel_username}"
+ await post_code_handler_data.channel_msg.edit_text(post_text, parse_mode=ParseMode.HTML)
+ logger.success("PostCode 兑换码发生变化,已更新频道消息")
+ else:
+ logger.debug("PostCode 兑换码未发生变化")
+ except (APIHelperException, ValueError) as exc:
+ logger.error("获取兑换码时发生错误 %s", str(exc))
+ except BadRequest as exc:
+ logger.error("自动更新兑换码消息失败 Message[%s]", exc.message)
+ except Forbidden as exc:
+ logger.error("自动更新兑换码消息失败 message[%s]", exc.message)
+ except Exception as exc:
+ logger.error("自动更新兑换码消息失败", exc_info=exc)
+ if post_code_handler_data.need_end_task():
+ logger.success("PostCode 定时任务结束")
+ else:
+ self.create_task(post_code_handler_data)