♻ Refactor sign system

* ♻ 重写签到系统

Co-authored-by: xtaodada <xtao@xtaolink.cn>
This commit is contained in:
洛水居室 2022-10-30 21:35:53 +08:00 committed by GitHub
parent 53f30a8f85
commit 5a64d6a068
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 171 additions and 179 deletions

View File

@ -1,11 +1,14 @@
import asyncio
import datetime import datetime
import json import json
import random
import re import re
import time import time
from json import JSONDecodeError from json import JSONDecodeError
from typing import Optional, Dict, Tuple from typing import Optional, Dict, Tuple
from genshin import Game, GenshinException, AlreadyClaimed, Client from genshin import Game, GenshinException, AlreadyClaimed, Client
from genshin.utility import recognize_genshin_server
from httpx import AsyncClient, TimeoutException from httpx import AsyncClient, TimeoutException
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ChatAction from telegram.constants import ChatAction
@ -31,40 +34,54 @@ from utils.helpers import get_genshin_client
from utils.log import logger from utils.log import logger
class SignRedis: class NeedChallenge(Exception):
client = bot.find_service(RedisDB).client def __init__(self, uid: int, gt: str = "", challenge: str = ""):
qname = "plugin:sign:" super().__init__()
self.uid = uid
self.gt = gt
self.challenge = challenge
@staticmethod
async def get(uid: int) -> Tuple[Optional[str], Optional[str]]: class SignSystem:
data = await SignRedis.client.get(f"{SignRedis.qname}{uid}") def __init__(self, redis: RedisDB):
self.cache = redis.client
self.qname = "plugin:sign:"
async def get_challenge(self, uid: int) -> Tuple[Optional[str], Optional[str]]:
data = await self.cache.get(f"{self.qname}{uid}")
if not data: if not data:
return None, None return None, None
data = data.decode("utf-8").split("|") data = data.decode("utf-8").split("|")
return data[0], data[1] return data[0], data[1]
@staticmethod async def set_challenge(self, uid: int, gt: str, challenge: str):
async def set(uid: int, gt: str, challenge: str): await self.cache.set(f"{self.qname}{uid}", f"{gt}|{challenge}")
await SignRedis.client.set(f"{SignRedis.qname}{uid}", f"{gt}|{challenge}") await self.cache.expire(f"{self.qname}{uid}", 10 * 60)
await SignRedis.client.expire(f"{SignRedis.qname}{uid}", 10 * 60)
async def gen_challenge_header(self, uid: int, validate: str) -> Optional[Dict]:
_, challenge = await self.get_challenge(uid)
if not challenge or not validate:
return
return {
"x-rpc-challenge": challenge,
"x-rpc-validate": validate,
"x-rpc-seccode": f"{validate}|jordan",
}
class Sign(Plugin, BasePlugin): async def gen_challenge_button(
"""每日签到""" self, uid: int, user_id: int, gt: Optional[str] = None, challenge: Optional[str] = None
) -> Optional[InlineKeyboardMarkup]:
CHECK_SERVER, COMMAND_RESULT = range(10400, 10402) if not config.pass_challenge_user_web:
return None
def __init__( if gt and challenge:
self, await self.set_challenge(uid, gt, challenge)
user_service: UserService = None, data = f"sign|{user_id}|{uid}"
cookies_service: CookiesService = None, return InlineKeyboardMarkup([[InlineKeyboardButton("请尽快点我进行手动验证", callback_data=data)]])
sign_service: SignServices = None, gt, challenge = await self.get_challenge(uid)
bot_admin_service: BotAdminService = None, if not challenge or not gt:
): return
self.bot_admin_service = bot_admin_service url = f"{config.pass_challenge_user_web}?username={bot.app.bot.username}&gt={gt}&challenge={challenge}"
self.cookies_service = cookies_service return InlineKeyboardMarkup([[InlineKeyboardButton("请尽快点我进行手动验证", url=url)]])
self.user_service = user_service
self.sign_service = sign_service
@staticmethod @staticmethod
async def pass_challenge(gt: str, challenge: str, referer: str = None) -> Optional[Dict]: async def pass_challenge(gt: str, challenge: str, referer: str = None) -> Optional[Dict]:
@ -172,45 +189,33 @@ class Sign(Plugin, BasePlugin):
logger.warning("签到 recognize 请求失败") logger.warning("签到 recognize 请求失败")
return None return None
@staticmethod
async def gen_challenge_header(uid: int, validate: str) -> Optional[Dict]:
_, challenge = await SignRedis.get(uid)
if not challenge or not validate:
return
return {
"x-rpc-challenge": challenge,
"x-rpc-validate": validate,
"x-rpc-seccode": f"{validate}|jordan",
}
@staticmethod
async def gen_challenge_button(uid: int, user_id: int, gt: str = None, challenge: str = None):
if not config.pass_challenge_user_web:
return None
if gt and challenge:
await SignRedis.set(uid, gt, challenge)
data = f"sign|{user_id}|{uid}"
return InlineKeyboardMarkup([[InlineKeyboardButton("请尽快点我进行手动验证", callback_data=data)]])
gt, challenge = await SignRedis.get(uid)
if not gt or not challenge:
return
url = f"{config.pass_challenge_user_web}?username={bot.app.bot.username}&gt={gt}&challenge={challenge}"
return InlineKeyboardMarkup([[InlineKeyboardButton("请尽快点我进行手动验证", url=url)]])
@staticmethod
async def start_sign( async def start_sign(
client: Client, user_id: int, headers: Dict = None self,
) -> Tuple[str, Optional[InlineKeyboardMarkup]]: client: Client,
headers: Optional[Dict] = None,
is_sleep: bool = False,
is_raise: bool = False,
title: Optional[str] = "签到结果",
) -> str:
if is_sleep:
if recognize_genshin_server(client.uid) in ("cn_gf01", "cn_qd01"):
await asyncio.sleep(random.randint(10, 300)) # nosec
else:
await asyncio.sleep(random.randint(0, 3)) # nosec
try: try:
rewards = await client.get_monthly_rewards(game=Game.GENSHIN, lang="zh-cn") rewards = await client.get_monthly_rewards(game=Game.GENSHIN, lang="zh-cn")
except GenshinException as error: except GenshinException as error:
logger.warning(f"UID {client.uid} 获取签到信息失败API返回信息为 {str(error)}") logger.warning(f"UID {client.uid} 获取签到信息失败API返回信息为 {str(error)}")
return f"获取签到信息失败API返回信息为 {str(error)}", None if is_raise:
raise error
return f"获取签到信息失败API返回信息为 {str(error)}"
try: try:
daily_reward_info = await client.get_reward_info(game=Game.GENSHIN, lang="zh-cn") # 获取签到信息失败 daily_reward_info = await client.get_reward_info(game=Game.GENSHIN, lang="zh-cn") # 获取签到信息失败
except GenshinException as error: except GenshinException as error:
logger.warning(f"UID {client.uid} 获取签到状态失败API返回信息为 {str(error)}") logger.warning(f"UID {client.uid} 获取签到状态失败API返回信息为 {str(error)}")
return f"获取签到状态失败API返回信息为 {str(error)}", None if is_raise:
raise error
return f"获取签到状态失败API返回信息为 {str(error)}"
if not daily_reward_info.signed_in: if not daily_reward_info.signed_in:
try: try:
request_daily_reward = await client.request_daily_reward( request_daily_reward = await client.request_daily_reward(
@ -221,7 +226,8 @@ class Sign(Plugin, BasePlugin):
headers=headers, headers=headers,
) )
if request_daily_reward and request_daily_reward.get("success", 0) == 1: if request_daily_reward and request_daily_reward.get("success", 0) == 1:
headers = await Sign.pass_challenge( # 尝试通过 ajax 请求绕过签到
headers = await self.pass_challenge(
request_daily_reward.get("gt", ""), request_daily_reward.get("gt", ""),
request_daily_reward.get("challenge", ""), request_daily_reward.get("challenge", ""),
) )
@ -233,23 +239,27 @@ class Sign(Plugin, BasePlugin):
headers=headers, headers=headers,
) )
if request_daily_reward and request_daily_reward.get("success", 0) == 1: if request_daily_reward and request_daily_reward.get("success", 0) == 1:
button = await Sign.gen_challenge_button( # 如果绕过失败 抛出异常 相关信息写入
client.uid, raise NeedChallenge(
user_id, uid=client.uid,
request_daily_reward.get("gt", ""), gt=request_daily_reward.get("gt", ""),
request_daily_reward.get("challenge", ""), challenge=request_daily_reward.get("challenge", ""),
) )
logger.warning(f"UID {client.uid} 签到失败,触发验证码风控")
return f"UID {client.uid} 签到失败,触发验证码风控,请尝试重新签到。", button
logger.info(f"UID {client.uid} 签到成功") logger.info(f"UID {client.uid} 签到成功")
except TimeoutException: except TimeoutException as error:
return "签到失败了呜呜呜 ~ 服务器连接超时 服务器熟啦 ~ ", None if is_raise:
except AlreadyClaimed: raise error
return "签到失败了呜呜呜 ~ 服务器连接超时 服务器熟啦 ~ "
except AlreadyClaimed as error:
logger.info(f"UID {client.uid} 已经签到") logger.info(f"UID {client.uid} 已经签到")
if is_raise:
raise error
result = "今天旅行者已经签到过了~" result = "今天旅行者已经签到过了~"
except GenshinException as error: except GenshinException as error:
logger.warning(f"UID {client.uid} 签到失败API返回信息为 {str(error)}") logger.warning(f"UID {client.uid} 签到失败API返回信息为 {str(error)}")
return f"获取签到状态失败API返回信息为 {str(error)}", None if is_raise:
raise error
return f"获取签到状态失败API返回信息为 {str(error)}"
else: else:
logger.info(f"UID {client.uid} 签到成功") logger.info(f"UID {client.uid} 签到成功")
result = "OK" result = "OK"
@ -265,13 +275,34 @@ class Sign(Plugin, BasePlugin):
if not daily_reward_info.signed_in: if not daily_reward_info.signed_in:
missed_days -= 1 missed_days -= 1
message = ( message = (
f"#### {today} (UTC+8) ####\n" f"#### {title} ####\n"
f"时间:{today} (UTC+8)\n"
f"UID: {client.uid}\n" f"UID: {client.uid}\n"
f"今日奖励: {reward.name} × {reward.amount}\n" f"今日奖励: {reward.name} × {reward.amount}\n"
f"本月漏签次数:{missed_days}\n" f"本月漏签次数:{missed_days}\n"
f"签到结果: {result}" f"签到结果: {result}"
) )
return message, None return message
class Sign(Plugin, BasePlugin):
"""每日签到"""
CHECK_SERVER, COMMAND_RESULT = range(10400, 10402)
def __init__(
self,
redis: RedisDB = None,
user_service: UserService = None,
cookies_service: CookiesService = None,
sign_service: SignServices = None,
bot_admin_service: BotAdminService = None,
):
self.bot_admin_service = bot_admin_service
self.cookies_service = cookies_service
self.user_service = user_service
self.sign_service = sign_service
self.system = SignSystem(redis)
async def _process_auto_sign(self, user_id: int, chat_id: int, method: str) -> str: async def _process_auto_sign(self, user_id: int, chat_id: int, method: str) -> str:
try: try:
@ -310,7 +341,7 @@ class Sign(Plugin, BasePlugin):
user = update.effective_user user = update.effective_user
message = update.effective_message message = update.effective_message
args = get_all_args(context) args = get_all_args(context)
validate = None validate: Optional[str] = None
if len(args) >= 1: if len(args) >= 1:
msg = None msg = None
if args[0] == "开启自动签到": if args[0] == "开启自动签到":
@ -335,10 +366,10 @@ class Sign(Plugin, BasePlugin):
self._add_delete_message_job(context, message.chat_id, message.message_id) self._add_delete_message_job(context, message.chat_id, message.message_id)
try: try:
client = await get_genshin_client(user.id) client = await get_genshin_client(user.id)
headers = await Sign.gen_challenge_header(client.uid, validate)
await message.reply_chat_action(ChatAction.TYPING) await message.reply_chat_action(ChatAction.TYPING)
sign_text, button = await self.start_sign(client, user.id, headers) headers = await self.system.gen_challenge_header(client.uid, validate)
reply_message = await message.reply_text(sign_text, allow_sending_without_reply=True, reply_markup=button) sign_text = await self.system.start_sign(client, headers)
reply_message = await message.reply_text(sign_text, allow_sending_without_reply=True)
if filters.ChatType.GROUPS.filter(reply_message): if filters.ChatType.GROUPS.filter(reply_message):
self._add_delete_message_job(context, reply_message.chat_id, reply_message.message_id) self._add_delete_message_job(context, reply_message.chat_id, reply_message.message_id)
except (UserNotFoundError, CookiesNotFoundError): except (UserNotFoundError, CookiesNotFoundError):
@ -352,6 +383,18 @@ class Sign(Plugin, BasePlugin):
self._add_delete_message_job(context, message.chat_id, message.message_id, 30) self._add_delete_message_job(context, message.chat_id, message.message_id, 30)
else: else:
await message.reply_text("未查询到您所绑定的账号信息,请先绑定账号", reply_markup=InlineKeyboardMarkup(buttons)) await message.reply_text("未查询到您所绑定的账号信息,请先绑定账号", reply_markup=InlineKeyboardMarkup(buttons))
except NeedChallenge as exc:
button = await self.system.gen_challenge_button(
exc.uid,
user.id,
exc.gt,
exc.challenge,
)
reply_message = await message.reply_text(
f"UID {exc.uid} 签到失败,触发验证码风控,请尝试点击下方按钮重新签到", allow_sending_without_reply=True, reply_markup=button
)
if filters.ChatType.GROUPS.filter(reply_message):
self._add_delete_message_job(context, reply_message.chat_id, reply_message.message_id)
@handler(CallbackQueryHandler, pattern=r"^sign\|", block=False) @handler(CallbackQueryHandler, pattern=r"^sign\|", block=False)
@restricts(restricts_time_of_groups=20, without_overlapping=True) @restricts(restricts_time_of_groups=20, without_overlapping=True)
@ -364,14 +407,14 @@ class Sign(Plugin, BasePlugin):
_data = callback_query_data.split("|") _data = callback_query_data.split("|")
_user_id = int(_data[1]) _user_id = int(_data[1])
_uid = int(_data[2]) _uid = int(_data[2])
logger.debug(f"callback_query_data 函数返回 user_id[{_user_id}] uid[{_uid}]") logger.debug(f"get_sign_callback 函数返回 user_id[{_user_id}] uid[{_uid}]")
return _user_id, _uid return _user_id, _uid
user_id, uid = await get_sign_callback(callback_query.data) user_id, uid = await get_sign_callback(callback_query.data)
if user.id != user_id: if user.id != user_id:
await callback_query.answer(text="这不是你的按钮!\n再乱点再按我叫西风骑士团、千岩军、天领奉和教令院了!", show_alert=True) await callback_query.answer(text="这不是你的按钮!\n" "再乱点再按我叫西风骑士团、千岩军、天领奉和教令院了!", show_alert=True)
return return
challenge = await SignRedis.get(uid) _, challenge = await self.system.get_challenge(uid)
if not challenge: if not challenge:
await callback_query.answer(text="验证请求已经过期,请重新发起签到!", show_alert=True) await callback_query.answer(text="验证请求已经过期,请重新发起签到!", show_alert=True)
return return

View File

@ -1,93 +1,36 @@
import asyncio
import datetime import datetime
import random
import time
from aiohttp import ClientConnectorError from aiohttp import ClientConnectorError
from genshin import Game, GenshinException, AlreadyClaimed, InvalidCookies from genshin import GenshinException, AlreadyClaimed, InvalidCookies
from genshin.utility import recognize_genshin_server
from httpx import TimeoutException from httpx import TimeoutException
from telegram.constants import ParseMode from telegram.constants import ParseMode
from telegram.error import BadRequest, Forbidden from telegram.error import BadRequest, Forbidden
from telegram.ext import CallbackContext from telegram.ext import CallbackContext
from core.base.redisdb import RedisDB
from core.cookies import CookiesService from core.cookies import CookiesService
from core.plugin import Plugin, job from core.plugin import Plugin, job
from core.sign.models import SignStatusEnum from core.sign.models import SignStatusEnum
from core.sign.services import SignServices from core.sign.services import SignServices
from core.user import UserService from core.user import UserService
from plugins.genshin.sign import Sign from plugins.genshin.sign import SignSystem, NeedChallenge
from plugins.system.errorhandler import notice_chat_id from plugins.system.errorhandler import notice_chat_id
from plugins.system.sign_status import SignStatus from plugins.system.sign_status import SignStatus
from utils.helpers import get_genshin_client
from utils.log import logger from utils.log import logger
class NeedChallenge(Exception):
pass
class SignJob(Plugin): class SignJob(Plugin):
def __init__( def __init__(
self, self,
sign_service: SignServices = None, sign_service: SignServices = None,
user_service: UserService = None, user_service: UserService = None,
cookies_service: CookiesService = None, cookies_service: CookiesService = None,
redis: RedisDB = None,
): ):
self.sign_service = sign_service self.sign_service = sign_service
self.cookies_service = cookies_service self.cookies_service = cookies_service
self.user_service = user_service self.user_service = user_service
self.sign_system = SignSystem(redis)
@staticmethod
async def single_sign(user_id: int) -> str:
client = await get_genshin_client(user_id)
if recognize_genshin_server(client.uid) in ("cn_gf01", "cn_qd01"):
await asyncio.sleep(random.randint(10, 300)) # nosec
else:
await asyncio.sleep(random.randint(0, 3)) # nosec
rewards = await client.get_monthly_rewards(game=Game.GENSHIN, lang="zh-cn")
daily_reward_info = await client.get_reward_info(game=Game.GENSHIN)
if not daily_reward_info.signed_in:
request_daily_reward = await client.request_daily_reward("sign", method="POST", game=Game.GENSHIN)
if request_daily_reward and request_daily_reward.get("success", 0) == 1:
headers = await Sign.pass_challenge(
request_daily_reward.get("gt", ""),
request_daily_reward.get("challenge", ""),
)
if not headers:
logger.warning(f"UID {client.uid} 签到失败,触发验证码风控")
raise NeedChallenge
request_daily_reward = await client.request_daily_reward(
"sign",
method="POST",
game=Game.GENSHIN,
lang="zh-cn",
headers=headers,
)
if request_daily_reward and request_daily_reward.get("success", 0) == 1:
logger.warning(f"UID {client.uid} 签到失败,触发验证码风控")
raise NeedChallenge
logger.info(f"UID {client.uid} 签到请求 {request_daily_reward} | 签到成功")
else:
logger.info(f"UID {client.uid} 签到请求 {request_daily_reward}")
result = "OK"
else:
result = "今天旅行者已经签到过了~"
reward = rewards[daily_reward_info.claimed_rewards - (1 if daily_reward_info.signed_in else 0)]
today = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
cn_timezone = datetime.timezone(datetime.timedelta(hours=8))
now = datetime.datetime.now(cn_timezone)
missed_days = now.day - daily_reward_info.claimed_rewards
if not daily_reward_info.signed_in:
missed_days -= 1
return (
f"########### 定时签到 ###########\n"
f"#### {today} (UTC+8) ####\n"
f"UID: {client.uid}\n"
f"今日奖励: {reward.name} × {reward.amount}\n"
f"本月漏签次数:{missed_days}\n"
f"签到结果: {result}"
)
@job.run_daily(time=datetime.time(hour=0, minute=1, second=0), name="SignJob") @job.run_daily(time=datetime.time(hour=0, minute=1, second=0), name="SignJob")
async def sign(self, context: CallbackContext): async def sign(self, context: CallbackContext):
@ -108,7 +51,9 @@ class SignJob(Plugin):
if sign_db.status in [SignStatusEnum.STATUS_SUCCESS, SignStatusEnum.ALREADY_CLAIMED]: if sign_db.status in [SignStatusEnum.STATUS_SUCCESS, SignStatusEnum.ALREADY_CLAIMED]:
continue continue
try: try:
text = await self.single_sign(user_id) text = await self.sign_system.start_sign(
user_id, is_sleep=True, is_raise=True, title="自动签到" if context.job.name == "SignJob" else "自动重新签到"
)
sign_db.status = SignStatusEnum.STATUS_SUCCESS sign_db.status = SignStatusEnum.STATUS_SUCCESS
except InvalidCookies: except InvalidCookies:
text = "自动签到执行失败Cookie无效" text = "自动签到执行失败Cookie无效"

View File

@ -1,6 +1,4 @@
import asyncio
import datetime import datetime
import random
from aiohttp import ClientConnectorError from aiohttp import ClientConnectorError
from genshin import InvalidCookies, AlreadyClaimed, GenshinException from genshin import InvalidCookies, AlreadyClaimed, GenshinException
@ -9,12 +7,14 @@ from telegram.constants import ParseMode
from telegram.error import BadRequest, Forbidden from telegram.error import BadRequest, Forbidden
from telegram.ext import CommandHandler, CallbackContext from telegram.ext import CommandHandler, CallbackContext
from core.base.redisdb import RedisDB
from core.cookies import CookiesService from core.cookies import CookiesService
from core.plugin import Plugin, handler from core.plugin import Plugin, handler
from core.sign import SignServices from core.sign import SignServices
from core.sign.models import SignStatusEnum from core.sign.models import SignStatusEnum
from core.user import UserService from core.user import UserService
from plugins.jobs.sign import NeedChallenge, SignJob from plugins.genshin.sign import SignSystem
from plugins.jobs.sign import NeedChallenge
from utils.decorators.admins import bot_admins_rights_check from utils.decorators.admins import bot_admins_rights_check
from utils.log import logger from utils.log import logger
@ -25,10 +25,12 @@ class SignAll(Plugin):
sign_service: SignServices = None, sign_service: SignServices = None,
user_service: UserService = None, user_service: UserService = None,
cookies_service: CookiesService = None, cookies_service: CookiesService = None,
redis: RedisDB = None,
): ):
self.sign_service = sign_service self.sign_service = sign_service
self.cookies_service = cookies_service self.cookies_service = cookies_service
self.user_service = user_service self.user_service = user_service
self.sign_system = SignSystem(redis)
@handler(CommandHandler, command="sign_all", block=False) @handler(CommandHandler, command="sign_all", block=False)
@bot_admins_rights_check @bot_admins_rights_check
@ -42,7 +44,7 @@ class SignAll(Plugin):
user_id = sign_db.user_id user_id = sign_db.user_id
old_status = sign_db.status old_status = sign_db.status
try: try:
text = await SignJob.single_sign(user_id) text = await self.sign_system.start_sign(user_id, is_sleep=True, is_raise=True, title="自动重新签到")
except InvalidCookies: except InvalidCookies:
text = "自动签到执行失败Cookie无效" text = "自动签到执行失败Cookie无效"
sign_db.status = SignStatusEnum.INVALID_COOKIES sign_db.status = SignStatusEnum.INVALID_COOKIES
@ -67,10 +69,7 @@ class SignAll(Plugin):
if sign_db.chat_id < 0: if sign_db.chat_id < 0:
text = f'<a href="tg://user?id={sign_db.user_id}">NOTICE {sign_db.user_id}</a>\n\n{text}' text = f'<a href="tg://user?id={sign_db.user_id}">NOTICE {sign_db.user_id}</a>\n\n{text}'
try: try:
if "今天旅行者已经签到过了~" not in text: await context.bot.send_message(sign_db.chat_id, text, parse_mode=ParseMode.HTML)
await context.bot.send_message(sign_db.chat_id, text, parse_mode=ParseMode.HTML)
await asyncio.sleep(random.randint(10, 50)) # nosec
# 回复延迟 [10, 60] 避免触发洪水防御
except BadRequest as exc: except BadRequest as exc:
logger.error(f"执行自动签到时发生错误 用户UID[{user_id}]") logger.error(f"执行自动签到时发生错误 用户UID[{user_id}]")
logger.exception(exc) logger.exception(exc)

View File

@ -1,19 +1,22 @@
import contextlib from telegram import Update, ReplyKeyboardRemove, Message, User
from telegram import Update, ReplyKeyboardRemove
from telegram.constants import ChatAction from telegram.constants import ChatAction
from telegram.ext import CallbackContext, CommandHandler from telegram.ext import CallbackContext, CommandHandler
from telegram.helpers import escape_markdown from telegram.helpers import escape_markdown
from core.base.redisdb import RedisDB
from core.cookies.error import CookiesNotFoundError from core.cookies.error import CookiesNotFoundError
from core.plugin import handler, Plugin from core.plugin import handler, Plugin
from core.user.error import UserNotFoundError from core.user.error import UserNotFoundError
from plugins.genshin.sign import Sign from plugins.genshin.sign import SignSystem, NeedChallenge
from utils.decorators.restricts import restricts from utils.decorators.restricts import restricts
from utils.helpers import get_genshin_client from utils.helpers import get_genshin_client
from utils.log import logger
class StartPlugin(Plugin): class StartPlugin(Plugin):
def __init__(self, redis: RedisDB = None):
self.sign_system = SignSystem(redis)
@handler(CommandHandler, command="start", block=False) @handler(CommandHandler, command="start", block=False)
@restricts() @restricts()
async def start(self, update: Update, context: CallbackContext) -> None: async def start(self, update: Update, context: CallbackContext) -> None:
@ -37,39 +40,14 @@ class StartPlugin(Plugin):
f"{escape_markdown('发送 /setuid 或 /setcookie 命令进入绑定账号流程')}" f"{escape_markdown('发送 /setuid 或 /setcookie 命令进入绑定账号流程')}"
) )
elif args[0] == "sign": elif args[0] == "sign":
await StartPlugin.gen_sign_button(update) await self.gen_sign_button(message, user)
elif args[0].startswith("challenge_"): elif args[0].startswith("challenge_"):
await StartPlugin.process_sign_validate(update, args[0][10:]) await self.process_sign_validate(message, user, args[0][10:])
else: else:
await message.reply_html(f"你好 {user.mention_html()} !我是派蒙 \n请点击 /{args[0]} 命令进入对应流程") await message.reply_html(f"你好 {user.mention_html()} !我是派蒙 \n请点击 /{args[0]} 命令进入对应流程")
return return
await message.reply_markdown_v2(f"你好 {user.mention_markdown_v2()} {escape_markdown('!我是派蒙 ')}") await message.reply_markdown_v2(f"你好 {user.mention_markdown_v2()} {escape_markdown('!我是派蒙 ')}")
@staticmethod
async def gen_sign_button(update: Update):
with contextlib.suppress(UserNotFoundError, CookiesNotFoundError):
client = await get_genshin_client(update.effective_user.id)
await update.effective_message.reply_chat_action(ChatAction.TYPING)
button = await Sign.gen_challenge_button(client.uid, update.effective_user.id)
if not button:
await update.effective_message.reply_text("验证请求已过期。", allow_sending_without_reply=True)
return
await update.effective_message.reply_text(
"请尽快点击下方按钮进行验证。", allow_sending_without_reply=True, reply_markup=button
)
@staticmethod
async def process_sign_validate(update: Update, validate: str):
with contextlib.suppress(UserNotFoundError, CookiesNotFoundError):
client = await get_genshin_client(update.effective_user.id)
await update.effective_message.reply_chat_action(ChatAction.TYPING)
headers = await Sign.gen_challenge_header(client.uid, validate)
if not headers:
await update.effective_message.reply_text("验证请求已过期。", allow_sending_without_reply=True)
return
sign_text, button = await Sign.start_sign(client, update.effective_user.id, headers)
await update.effective_message.reply_text(sign_text, allow_sending_without_reply=True, reply_markup=button)
@staticmethod @staticmethod
@restricts() @restricts()
async def unknown_command(update: Update, _: CallbackContext) -> None: async def unknown_command(update: Update, _: CallbackContext) -> None:
@ -89,3 +67,30 @@ class StartPlugin(Plugin):
@restricts() @restricts()
async def reply_keyboard_remove(self, update: Update, _: CallbackContext) -> None: async def reply_keyboard_remove(self, update: Update, _: CallbackContext) -> None:
await update.message.reply_text("移除远程键盘成功", reply_markup=ReplyKeyboardRemove()) await update.message.reply_text("移除远程键盘成功", reply_markup=ReplyKeyboardRemove())
async def gen_sign_button(self, message: Message, user: User):
try:
client = await get_genshin_client(user.id)
await message.reply_chat_action(ChatAction.TYPING)
button = await self.sign_system.gen_challenge_button(client.uid, user.id)
if not button:
await message.reply_text("验证请求已过期。", allow_sending_without_reply=True)
return
await message.reply_text("请尽快点击下方按钮进行验证。", allow_sending_without_reply=True, reply_markup=button)
except (UserNotFoundError, CookiesNotFoundError):
logger.warning(f"用户 {user.full_name}[{user.id}] 账号信息未找到")
async def process_sign_validate(self, message: Message, user: User, validate: str):
try:
client = await get_genshin_client(user.id)
await message.reply_chat_action(ChatAction.TYPING)
headers = await self.sign_system.gen_challenge_header(client.uid, validate)
if not headers:
await message.reply_text("验证请求已过期。", allow_sending_without_reply=True)
return
sign_text = await self.sign_system.start_sign(client, headers=headers)
await message.reply_text(sign_text, allow_sending_without_reply=True)
except (UserNotFoundError, CookiesNotFoundError):
logger.warning(f"用户 {user.full_name}[{user.id}] 账号信息未找到")
except NeedChallenge:
await message.reply_text("回调错误,请重新签到", allow_sending_without_reply=True)