Add Webapp

This commit is contained in:
洛水居室 2022-11-20 19:32:56 +08:00 committed by GitHub
parent d096cd29a5
commit 2b83985efd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 190 additions and 85 deletions

View File

@ -405,9 +405,9 @@ class Verification:
def get_url(host: str, url: str): def get_url(host: str, url: str):
return f"https://{host}{url}" return f"https://{host}{url}"
async def create(self): async def create(self, is_high: bool = False):
url = self.get_url(self.HOST, self.CREATE_VERIFICATION_URL) url = self.get_url(self.HOST, self.CREATE_VERIFICATION_URL)
params = {"is_high": "true"} params = {"is_high": "true" if is_high else "false"}
headers = self.get_headers(params=params) headers = self.get_headers(params=params)
response = await self.client.get(url, params=params, headers=headers) response = await self.client.get(url, params=params, headers=headers)
return response return response

View File

@ -1,7 +1,7 @@
from typing import Tuple, Optional from typing import Tuple, Optional
from genshin import Region, GenshinException from genshin import Region, GenshinException
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram import Update, WebAppInfo, KeyboardButton, ReplyKeyboardMarkup
from telegram.ext import CallbackContext from telegram.ext import CallbackContext
from core.base.redisdb import RedisDB from core.base.redisdb import RedisDB
@ -12,7 +12,7 @@ from core.cookies.error import CookiesNotFoundError
from core.plugin import Plugin, handler from core.plugin import Plugin, handler
from core.user import UserService from core.user import UserService
from core.user.error import UserNotFoundError from core.user.error import UserNotFoundError
from modules.apihelper.error import ResponseException, APIHelperException from modules.apihelper.error import ResponseException
from modules.apihelper.hyperion import Verification from modules.apihelper.hyperion import Verification
from utils.decorators.error import error_callable from utils.decorators.error import error_callable
from utils.decorators.restricts import restricts from utils.decorators.restricts import restricts
@ -61,33 +61,21 @@ class VerificationPlugins(Plugin, BasePlugin):
except CookiesNotFoundError: except CookiesNotFoundError:
await message.reply_text("检测到用户为UID绑定无需认证") await message.reply_text("检测到用户为UID绑定无需认证")
return return
is_high: bool = False
verification = Verification(cookies=client.cookie_manager.cookies) verification = Verification(cookies=client.cookie_manager.cookies)
if context.args and len(context.args) > 0: if not context.args:
validate = context.args[0] try:
_, challenge = await self.system.get_challenge(client.uid) await client.get_genshin_notes()
logger.info("用户 %s[%s] 请求通过认证 challenge[%s] validate[%s] ", user.full_name, user.id, challenge, validate) except GenshinException as exc:
if challenge: if exc.retcode == 1034:
try: is_high = True
await verification.verify(challenge, validate) else:
logger.success("用户 %s[%s] 验证成功", user.full_name, user.id) raise exc
await message.reply_text("验证成功")
except ResponseException as exc:
logger.warning("用户 %s[%s] 验证失效 API返回 [%s]%s", user.full_name, user.id, exc.code, exc.message)
await message.reply_text(f"验证失败 错误信息为 [{exc.code}]{exc.message}")
else: else:
logger.warning("用户 %s[%s] 验证失效 请求已经过期", user.full_name, user.id) await message.reply_text("账户正常,无需认证")
await message.reply_text("验证失效 请求已经过期 请稍后重试") return
return
try: try:
await client.get_genshin_notes() data = await verification.create(is_high=is_high)
except GenshinException as exc:
if exc.retcode != 1034:
raise exc
else:
await message.reply_text("账户正常,无需认证")
return
try:
data = await verification.create()
challenge = data["challenge"] challenge = data["challenge"]
gt = data["gt"] gt = data["gt"]
logger.success("用户 %s[%s] 创建验证成功 gt[%s] challenge[%s]", user.full_name, user.id, gt, challenge) logger.success("用户 %s[%s] 创建验证成功 gt[%s] challenge[%s]", user.full_name, user.id, gt, challenge)
@ -95,16 +83,14 @@ class VerificationPlugins(Plugin, BasePlugin):
logger.warning("用户 %s[%s] 创建验证失效 API返回 [%s]%s", user.full_name, user.id, exc.code, exc.message) logger.warning("用户 %s[%s] 创建验证失效 API返回 [%s]%s", user.full_name, user.id, exc.code, exc.message)
await message.reply_text(f"创建验证失败 错误信息为 [{exc.code}]{exc.message} 请稍后重试") await message.reply_text(f"创建验证失败 错误信息为 [{exc.code}]{exc.message} 请稍后重试")
return return
try:
validate = await verification.ajax(referer="https://webstatic.mihoyo.com/", gt=gt, challenge=challenge)
if validate:
await verification.verify(challenge, validate)
logger.success(f"用户 %s[%s] 通过 ajax 验证", user.full_name, user.id)
await message.reply_text("验证成功")
return
except APIHelperException as exc:
logger.warning(f"用户 %s[%s] ajax 验证失效 错误信息为 %s", user.full_name, user.id, repr(exc))
await self.system.set_challenge(client.uid, gt, challenge) await self.system.set_challenge(client.uid, gt, challenge)
url = f"{config.pass_challenge_user_web}?username={context.bot.username}&command=verify&gt={gt}&challenge={challenge}&uid={client.uid}" url = f"{config.pass_challenge_user_web}/webapp?username={context.bot.username}&command=verify&gt={gt}&challenge={challenge}&uid={client.uid}"
button = InlineKeyboardMarkup([[InlineKeyboardButton("验证", url=url)]]) await message.reply_text(
await message.reply_text("请尽快点击下方手动验证", reply_markup=button) "请尽快在10秒内完成手动验证\n或发送 /web_cancel 取消操作",
reply_markup=ReplyKeyboardMarkup.from_button(
KeyboardButton(
text="点我手动验证",
web_app=WebAppInfo(url=url),
)
),
)

View File

@ -1,7 +1,7 @@
from typing import Optional from typing import Optional
from genshin import Region, GenshinException from genshin import Region, GenshinException
from telegram import Update, ReplyKeyboardRemove, Message, User, InlineKeyboardMarkup, InlineKeyboardButton from telegram import Update, ReplyKeyboardRemove, Message, User, WebAppInfo, ReplyKeyboardMarkup, KeyboardButton
from telegram.constants import ChatAction from telegram.constants import ChatAction
from telegram.ext import CallbackContext, CommandHandler from telegram.ext import CallbackContext, CommandHandler
from telegram.helpers import escape_markdown from telegram.helpers import escape_markdown
@ -66,9 +66,6 @@ class StartPlugin(Plugin):
if _command == "sign": if _command == "sign":
logger.info(f"用户 %s[%s] 通过start命令 进入签到流程", user.full_name, user.id) logger.info(f"用户 %s[%s] 通过start命令 进入签到流程", user.full_name, user.id)
await self.process_sign_validate(message, user, _challenge) await self.process_sign_validate(message, user, _challenge)
elif _command == "verify":
logger.info(f"用户 %s[%s] 通过start命令 进入认证流程", user.full_name, user.id)
await self.process_validate(message, user, validate=_challenge)
else: else:
await message.reply_html(f"你好 {user.mention_html()} !我是派蒙 \n请点击 /{args[0]} 命令进入对应流程") await message.reply_html(f"你好 {user.mention_html()} !我是派蒙 \n请点击 /{args[0]} 命令进入对应流程")
return return
@ -122,9 +119,7 @@ class StartPlugin(Plugin):
except NeedChallenge: except NeedChallenge:
await message.reply_text("回调错误,请重新签到", allow_sending_without_reply=True) await message.reply_text("回调错误,请重新签到", allow_sending_without_reply=True)
async def process_validate( async def process_validate(self, message: Message, user: User, bot_username: Optional[str] = None):
self, message: Message, user: User, validate: Optional[str] = None, bot_username: Optional[str] = None
):
try: try:
client = await get_genshin_client(user.id) client = await get_genshin_client(user.id)
if client.region != Region.CHINESE: if client.region != Region.CHINESE:
@ -150,43 +145,32 @@ class StartPlugin(Plugin):
"在暂停使用期间依然出现频繁认证,建议修改密码以保护账号安全。" "在暂停使用期间依然出现频繁认证,建议修改密码以保护账号安全。"
) )
verification = Verification(cookies=client.cookie_manager.cookies) verification = Verification(cookies=client.cookie_manager.cookies)
if validate: try:
_, challenge = await self.verification_system.get_challenge(client.uid) data = await verification.create(is_high=True)
if challenge: challenge = data["challenge"]
logger.info( gt = data["gt"]
"用户 %s[%s] 请求通过认证 challenge[%s] validate[%s] ", user.full_name, user.id, challenge, validate logger.success("用户 %s[%s] 创建验证成功 gt[%s] challenge[%s]", user.full_name, user.id, gt, challenge)
) except ResponseException as exc:
try: logger.warning("用户 %s[%s] 创建验证失效 API返回 [%s]%s", user.full_name, user.id, exc.code, exc.message)
await verification.verify(challenge, validate) await message.reply_text(f"验证失败 错误信息为 [{exc.code}]{exc.message}")
logger.success("用户 %s[%s] 验证成功", user.full_name, user.id)
await message.reply_text("验证成功")
except ResponseException as exc:
logger.warning("用户 %s[%s] 验证失效 API返回 [%s]%s", user.full_name, user.id, exc.code, exc.message)
await message.reply_text(f"验证失败 错误信息为 [{exc.code}]{exc.message} 请稍后重试")
else:
logger.warning("用户 %s[%s] 验证失效 请求已经过期", user.full_name, user.id)
await message.reply_text("验证失效 请求已经过期 请稍后重试")
return return
if bot_username: try:
try: validate = await verification.ajax(referer="https://webstatic.mihoyo.com/", gt=gt, challenge=challenge)
data = await verification.create() if validate:
challenge = data["challenge"] await verification.verify(challenge, validate)
gt = data["gt"] logger.success("用户 %s[%s] 通过 ajax 验证", user.full_name, user.id)
logger.success("用户 %s[%s] 创建验证成功 gt[%s] challenge[%s]", user.full_name, user.id, gt, challenge) await message.reply_text("验证成功")
except ResponseException as exc:
logger.warning("用户 %s[%s] 创建验证失效 API返回 [%s]%s", user.full_name, user.id, exc.code, exc.message)
await message.reply_text(f"验证失败 错误信息为 [{exc.code}]{exc.message}")
return return
try: except APIHelperException as exc:
validate = await verification.ajax(referer="https://webstatic.mihoyo.com/", gt=gt, challenge=challenge) logger.warning("用户 %s[%s] ajax 验证失效 错误信息为 %s", user.full_name, user.id, repr(exc))
if validate: await self.verification_system.set_challenge(client.uid, gt, challenge)
await verification.verify(challenge, validate) url = f"{config.pass_challenge_user_web}/webapp?username={bot_username}&command=verify&gt={gt}&challenge={challenge}&uid={client.uid}"
logger.success("用户 %s[%s] 通过 ajax 验证", user.full_name, user.id) await message.reply_text(
await message.reply_text("验证成功") "请尽快在10秒内完成手动验证\n或发送 /web_cancel 取消操作",
return reply_markup=ReplyKeyboardMarkup.from_button(
except APIHelperException as exc: KeyboardButton(
logger.warning("用户 %s[%s] ajax 验证失效 错误信息为 %s", user.full_name, user.id, repr(exc)) text="点我手动验证",
await self.verification_system.set_challenge(client.uid, gt, challenge) web_app=WebAppInfo(url=url),
url = f"{config.pass_challenge_user_web}?username={bot_username}&command=verify&gt={gt}&challenge={challenge}&uid={client.uid}" )
button = InlineKeyboardMarkup([[InlineKeyboardButton("验证", url=url)]]) ),
await message.reply_text("请尽快点击下方手动验证", reply_markup=button) )

135
plugins/system/webapp.py Normal file
View File

@ -0,0 +1,135 @@
from genshin import Region, GenshinException
from pydantic import BaseModel
from telegram import ReplyKeyboardRemove, Update, WebAppInfo, ReplyKeyboardMarkup, KeyboardButton
from telegram.ext import CallbackContext, filters
from core.base.redisdb import RedisDB
from core.config import config
from core.cookies import CookiesService
from core.cookies.error import CookiesNotFoundError
from core.plugin import Plugin, handler
from core.user import UserService
from core.user.error import UserNotFoundError
from modules.apihelper.error import ResponseException
from modules.apihelper.hyperion import Verification
from plugins.genshin.verification import VerificationSystem
from utils.decorators.restricts import restricts
from utils.helpers import get_genshin_client
from utils.log import logger
class WebAppData(BaseModel):
path: str
data: dict
code: int
message: str
class WebAppDataException(Exception):
def __init__(self, data):
self.data = data
super().__init__()
class WebApp(Plugin):
def __init__(self, user_service: UserService = None, cookies_service: CookiesService = None, redis: RedisDB = None):
self.cookies_service = cookies_service
self.user_service = user_service
self.verification_system = VerificationSystem(redis)
@staticmethod
def de_web_app_data(data: str) -> WebAppData:
try:
return WebAppData.parse_raw(data)
except Exception as exc:
raise WebAppDataException(data) from exc
@handler.message(filters=filters.StatusUpdate.WEB_APP_DATA, block=False)
@restricts()
async def app(self, update: Update, context: CallbackContext):
user = update.effective_user
message = update.effective_message
web_app_data = message.web_app_data
if web_app_data:
logger.info("用户 %s[%s] 触发 WEB_APP_DATA 请求", user.full_name, user.id)
result = self.de_web_app_data(web_app_data.data)
logger.debug("path:%s\ndata:%s\ncode:%s\nmessage:%s", result.path, result.data, result.code, result.message)
if result.code == 0:
if result.path == "verify":
validate = result.data.get("geetest_validate")
try:
client = await get_genshin_client(user.id)
if client.region != Region.CHINESE:
await message.reply_text("非法用户", reply_markup=ReplyKeyboardRemove())
return
except UserNotFoundError:
await message.reply_text("用户未找到", reply_markup=ReplyKeyboardRemove())
return
except CookiesNotFoundError:
await message.reply_text("检测到用户为UID绑定无需认证", reply_markup=ReplyKeyboardRemove())
return
verification = Verification(cookies=client.cookie_manager.cookies)
if validate:
_, challenge = await self.verification_system.get_challenge(client.uid)
if challenge:
logger.info(
"用户 %s[%s] 请求通过认证 challenge[%s] validate[%s] ",
user.full_name,
user.id,
challenge,
validate,
)
try:
await verification.verify(challenge=challenge, validate=validate)
logger.success("用户 %s[%s] 验证成功", user.full_name, user.id)
await message.reply_text("验证成功", reply_markup=ReplyKeyboardRemove())
except ResponseException as exc:
logger.warning(
"用户 %s[%s] 验证失效 API返回 [%s]%s", user.full_name, user.id, exc.code, exc.message
)
await message.reply_text(f"验证失败 错误信息为 [{exc.code}]{exc.message} 请稍后重试", reply_markup=ReplyKeyboardRemove())
else:
logger.warning("用户 %s[%s] 验证失效 请求已经过期", user.full_name, user.id)
await message.reply_text("验证失效 请求已经过期 请稍后重试", reply_markup=ReplyKeyboardRemove())
return
try:
await client.get_genshin_notes()
except GenshinException as exc:
if exc.retcode != 1034:
raise exc
else:
await message.reply_text("账户正常,无需认证")
return
try:
data = await verification.create(is_high=True)
challenge = data["challenge"]
gt = data["gt"]
logger.success("用户 %s[%s] 创建验证成功 gt[%s] challenge[%s]", user.full_name, user.id, gt, challenge)
except ResponseException as exc:
logger.warning("用户 %s[%s] 创建验证失效 API返回 [%s]%s", user.full_name, user.id, exc.code, exc.message)
await message.reply_text(f"创建验证失败 错误信息为 [{exc.code}]{exc.message} 请稍后重试", reply_markup=ReplyKeyboardRemove())
return
await self.verification_system.set_challenge(client.uid, gt, challenge)
url = f"{config.pass_challenge_user_web}/webapp?username={context.bot.username}&command=verify&gt={gt}&challenge={challenge}&uid={client.uid}"
await message.reply_text(
"请尽快点击下方手动验证 或发送 /web_cancel 取消操作",
reply_markup=ReplyKeyboardMarkup.from_button(
KeyboardButton(
text="点我手动验证",
web_app=WebAppInfo(url=url),
)
),
)
else:
logger.warning(
"用户 %s[%s] WEB_APP_DATA 请求错误 [%s]%s", user.full_name, user.id, result.code, result.message
)
await message.reply_text(result.message, reply_markup=ReplyKeyboardRemove())
else:
logger.warning("用户 %s[%s] WEB_APP_DATA 非法数据", user.full_name, user.id)
@handler.command("web_cancel", block=False)
@restricts()
async def web_cancel(self, update: Update, _: CallbackContext) -> None:
message = update.effective_message
await message.reply_text("取消操作", reply_markup=ReplyKeyboardRemove())