♻ Refactor handling of sign-in verification

重构每日签到的验证码处理
This commit is contained in:
洛水居室 2022-11-17 16:07:18 +08:00
parent 684cba06a3
commit b7d2e1962a
No known key found for this signature in database
GPG Key ID: C9DE87DA724B88FC
3 changed files with 186 additions and 125 deletions

View File

@ -1,11 +1,9 @@
import asyncio
import datetime
import json
import random
import re
import time
from json import JSONDecodeError
from typing import Optional, Dict, Tuple
from typing import Optional, Tuple
from genshin import Game, GenshinException, AlreadyClaimed, Client
from genshin.utility import recognize_genshin_server
@ -27,6 +25,7 @@ from core.sign.models import Sign as SignUser, SignStatusEnum
from core.sign.services import SignServices
from core.user.error import UserNotFoundError
from core.user.services import UserService
from modules.apihelper.hyperion import Verification
from utils.bot import get_all_args
from utils.decorators.error import error_callable
from utils.decorators.restricts import restricts
@ -43,9 +42,15 @@ class NeedChallenge(Exception):
class SignSystem:
REFERER = (
"https://webstatic.mihoyo.com/bbs/event/signin-ys/index.html?"
"bbs_auth_required=true&act_id=e202009291139501&utm_source=bbs&utm_medium=mys&utm_campaign=icon"
)
def __init__(self, redis: RedisDB):
self.cache = redis.client
self.qname = "plugin:sign:"
self.verification = Verification()
async def get_challenge(self, uid: int) -> Tuple[Optional[str], Optional[str]]:
data = await self.cache.get(f"{self.qname}{uid}")
@ -58,16 +63,6 @@ class SignSystem:
await self.cache.set(f"{self.qname}{uid}", f"{gt}|{challenge}")
await self.cache.expire(f"{self.qname}{uid}", 10 * 60)
async def gen_challenge_header(self, uid: int, validate: str) -> Optional[Dict]:
_, challenge = await self.get_challenge(uid)
if not challenge or not validate:
return
return {
"x-rpc-challenge": challenge,
"x-rpc-validate": validate,
"x-rpc-seccode": f"{validate}|jordan",
}
async def get_challenge_button(
self, uid: int, user_id: int, gt: Optional[str] = None, challenge: Optional[str] = None, callback: bool = True
) -> Optional[InlineKeyboardMarkup]:
@ -86,72 +81,10 @@ class SignSystem:
url = f"{config.pass_challenge_user_web}?username={bot.app.bot.username}&command=sign&gt={gt}&challenge={challenge}&uid={uid}"
return InlineKeyboardMarkup([[InlineKeyboardButton("请尽快点我进行手动验证", url=url)]])
@staticmethod
async def pass_challenge(gt: str, challenge: str, referer: str = None) -> Optional[Dict]:
"""尝试自动通过验证,感谢项目 AutoMihoyoBBS 的贡献者 和 @coolxitech 大佬提供的方案
https://github.com/Womsxd/AutoMihoyoBBS
https://github.com/coolxitech/mihoyo
"""
if not gt or not challenge:
return None
async def recognize(self, gt: str, challenge: str, referer: str = None) -> Optional[str]:
if not referer:
referer = (
"https://webstatic.mihoyo.com/bbs/event/signin-ys/index.html?"
"bbs_auth_required=true&act_id=e202009291139501&utm_source=bbs&utm_medium=mys&utm_campaign=icon"
)
header = {
"Accept": "*/*",
"X-Requested-With": "com.mihoyo.hyperion",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/107.0.0.0 Safari/537.36",
"Referer": referer,
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
}
# ajax auto pass
try:
async with AsyncClient() as client:
# gt={gt}&challenge={challenge}&lang=zh-cn&pt=3
# client_type=web_mobile&callback=geetest_{int(time.time() * 1000)}
req = await client.get(
"https://api.geetest.com/ajax.php",
params={
"gt": gt,
"challenge": challenge,
"lang": "zh-cn",
"pt": 3,
"client_type": "web_mobile",
"callback": f"geetest_{int(time.time() * 1000)}",
},
headers=header,
timeout=30,
)
text = req.text
logger.debug(f"ajax 返回:%s", text)
if req.status_code != 200:
raise RuntimeError
text = re.findall(r"^.*?\((\{.*?)\)$", text)[0]
data = json.loads(text)
if "success" in data["status"] and "success" in data["data"]["result"]:
logger.info("签到 ajax 请求成功")
return {
"x-rpc-challenge": challenge,
"x-rpc-validate": data["data"]["validate"],
"x-rpc-seccode": f'{data["data"]["validate"]}|jordan',
}
except JSONDecodeError:
logger.warning("签到 ajax 请求 JSON 解析失败")
except TimeoutException as exc:
logger.warning("签到 ajax 请求超时")
if not config.pass_challenge_api:
raise exc
except (KeyError, IndexError):
logger.warning("签到 ajax 请求数据错误")
except RuntimeError:
logger.warning("签到 ajax 请求错误")
logger.warning("签到 ajax 请求失败")
if not config.pass_challenge_api:
referer = self.REFERER
if not gt or not challenge:
return None
pass_challenge_params = {
"gt": gt,
@ -160,42 +93,43 @@ class SignSystem:
}
if config.pass_challenge_app_key:
pass_challenge_params["appkey"] = config.pass_challenge_app_key
# custom api auto pass
headers = {
"Accept": "*/*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/107.0.0.0 Safari/537.36",
}
try:
async with AsyncClient() as client:
async with AsyncClient(headers=headers) as client:
resp = await client.post(
config.pass_challenge_api,
params=pass_challenge_params,
timeout=60,
)
logger.debug(f"签到 recognize 请求返回:%s", resp.text)
logger.debug("recognize 请求返回:%s", resp.text)
data = resp.json()
status = data.get("status")
if status is not None and status != 0:
logger.error(f"签到 recognize 请求解析错误:[%s]%s", data.get('code'), data.get('msg'))
if status != 0:
logger.error("recognize 解析错误:[%s]%s", data.get('code'), data.get('msg'))
if data.get("code", 0) != 0:
raise RuntimeError
logger.info("签到 recognize 请求 解析成功")
return {
"x-rpc-challenge": data["data"]["challenge"],
"x-rpc-validate": data["data"]["validate"],
"x-rpc-seccode": f'{data["data"]["validate"]}|jordan',
}
logger.info("recognize 解析成功")
return data["data"]["validate"]
except JSONDecodeError:
logger.warning("签到 recognize 请求 JSON 解析失败")
logger.warning("recognize 请求 JSON 解析失败")
except TimeoutException as exc:
logger.warning("签到 recognize 请求超时")
logger.warning("recognize 请求超时")
raise exc
except KeyError:
logger.warning("签到 recognize 请求数据错误")
logger.warning("recognize 请求数据错误")
except RuntimeError:
logger.warning("签到 recognize 请求失败")
logger.warning("recognize 请求失败")
return None
async def start_sign(
self,
client: Client,
headers: Optional[Dict] = None,
challenge: Optional[str] = None,
validate: Optional[str] = None,
is_sleep: bool = False,
is_raise: bool = False,
title: Optional[str] = "签到结果",
@ -208,68 +142,119 @@ class SignSystem:
try:
rewards = await client.get_monthly_rewards(game=Game.GENSHIN, lang="zh-cn")
except GenshinException as error:
logger.warning(f"UID {client.uid} 获取签到信息失败API返回信息为 {str(error)}")
logger.warning("UID[%s] 获取签到信息失败API返回信息为 %s", client.uid, str(error))
if is_raise:
raise error
return f"获取签到信息失败API返回信息为 {str(error)}"
try:
daily_reward_info = await client.get_reward_info(game=Game.GENSHIN, lang="zh-cn") # 获取签到信息失败
except GenshinException as error:
logger.warning(f"UID {client.uid} 获取签到状态失败API返回信息为 {str(error)}")
logger.warning("UID[%s] 获取签到状态失败API返回信息为 %s", client.uid, str(error))
if is_raise:
raise error
return f"获取签到状态失败API返回信息为 {str(error)}"
if not daily_reward_info.signed_in:
try:
if validate:
logger.info("UID[%s] 正在使用 challenge[%s] validate[%s] 通过验证码", client.uid, challenge, validate)
request_daily_reward = await client.request_daily_reward(
"sign",
method="POST",
game=Game.GENSHIN,
lang="zh-cn",
headers=headers,
challenge=challenge,
validate=validate,
)
logger.debug("request_daily_reward 返回 %s", request_daily_reward)
if request_daily_reward and request_daily_reward.get("success", 0) == 1:
# 尝试通过 ajax 请求绕过签到
headers = await self.pass_challenge(
request_daily_reward.get("gt", ""),
request_daily_reward.get("challenge", ""),
gt = request_daily_reward.get("gt", "")
challenge = request_daily_reward.get("challenge", "")
logger.warning("UID[%s] 触发验证码 challenge[%s] gt[%s]", client.uid, challenge, gt)
validate = await self.verification.ajax(
referer=self.REFERER,
gt=gt,
challenge=challenge,
)
request_daily_reward = await client.request_daily_reward(
"sign",
method="POST",
game=Game.GENSHIN,
lang="zh-cn",
headers=headers,
)
if request_daily_reward and request_daily_reward.get("success", 0) == 1:
# 如果绕过失败 抛出异常 相关信息写入
raise NeedChallenge(
uid=client.uid,
gt=request_daily_reward.get("gt", ""),
challenge=request_daily_reward.get("challenge", ""),
if validate:
logger.success("ajax 通过验证成功 challenge[%s] validate[%s]", challenge, validate)
request_daily_reward = await client.request_daily_reward(
"sign",
method="POST",
game=Game.GENSHIN,
lang="zh-cn",
challenge=challenge,
validate=validate,
)
logger.info(f"UID {client.uid} 签到成功")
logger.debug("request_daily_reward 返回 %s", request_daily_reward)
if request_daily_reward and request_daily_reward.get("success", 0) == 1:
logger.warning("UID[%s] 触发验证码 challenge[%s]", client.uid, challenge)
raise NeedChallenge(
uid=client.uid,
gt=request_daily_reward.get("gt", ""),
challenge=request_daily_reward.get("challenge", ""),
)
elif config.pass_challenge_app_key:
# 如果无法绕过 检查配置文件是否配置识别 API 尝试请求绕过
# 注意 需要重新获取没有进行任何请求的 Challenge
logger.info("正在为 recognize 重新请求签到")
_request_daily_reward = await client.request_daily_reward(
"sign",
method="POST",
game=Game.GENSHIN,
lang="zh-cn",
)
logger.debug("request_daily_reward 返回 %s", _request_daily_reward)
if _request_daily_reward and _request_daily_reward.get("success", 0) == 1:
_gt = _request_daily_reward.get("gt", "")
_challenge = _request_daily_reward.get("challenge", "")
_validate = await self.recognize(_gt, _challenge)
if _validate:
logger.success("recognize 通过验证成功 challenge[%s] validate[%s]", _challenge, _validate)
request_daily_reward = await client.request_daily_reward(
"sign",
method="POST",
game=Game.GENSHIN,
lang="zh-cn",
challenge=_challenge,
validate=_validate,
)
if request_daily_reward and request_daily_reward.get("success", 0) == 1:
logger.warning("UID[%s] 触发验证码 challenge[%s]", client.uid, challenge)
raise NeedChallenge(
uid=client.uid,
gt=request_daily_reward.get("gt", ""),
challenge=request_daily_reward.get("challenge", ""),
)
else:
logger.success("UID[%s] 通过 recognize 签到成功", client.uid)
else:
raise NeedChallenge(uid=client.uid, gt=gt, challenge=challenge)
else:
raise NeedChallenge(uid=client.uid, gt=gt, challenge=challenge)
else:
logger.success("UID[%s] 签到成功", client.uid)
except TimeoutException as error:
logger.warning("UID[%s] 签到请求超时", client.uid)
if is_raise:
raise error
return "签到失败了呜呜呜 ~ 服务器连接超时 服务器熟啦 ~ "
except AlreadyClaimed as error:
logger.info(f"UID {client.uid} 已经签到")
logger.warning("UID[%s] 已经签到", client.uid)
if is_raise:
raise error
result = "今天旅行者已经签到过了~"
except GenshinException as error:
logger.warning(f"UID {client.uid} 签到失败API返回信息为 {str(error)}")
logger.warning("UID %s 签到失败API返回信息为 %s", client.uid, str(error))
if is_raise:
raise error
return f"获取签到状态失败API返回信息为 {str(error)}"
else:
logger.info(f"UID {client.uid} 签到成功")
result = "OK"
else:
logger.info(f"UID {client.uid} 已经签到")
logger.info("UID[%s] 已经签到", client.uid)
result = "今天旅行者已经签到过了~"
logger.info(f"UID {client.uid} 签到结果 {result}")
logger.info("UID[%s] 签到结果 %s", client.uid, result)
reward = rewards[daily_reward_info.claimed_rewards - (1 if daily_reward_info.signed_in else 0)]
today = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
cn_timezone = datetime.timezone(datetime.timedelta(hours=8))
@ -370,8 +355,8 @@ class Sign(Plugin, BasePlugin):
try:
client = await get_genshin_client(user.id)
await message.reply_chat_action(ChatAction.TYPING)
headers = await self.system.gen_challenge_header(client.uid, validate)
sign_text = await self.system.start_sign(client, headers)
_, challenge = await self.system.get_challenge(client.uid)
sign_text = await self.system.start_sign(client, challenge=challenge, validate=validate)
reply_message = await message.reply_text(sign_text, allow_sending_without_reply=True)
if filters.ChatType.GROUPS.filter(reply_message):
self._add_delete_message_job(context, reply_message.chat_id, reply_message.message_id)

View File

@ -111,11 +111,11 @@ class StartPlugin(Plugin):
try:
client = await get_genshin_client(user.id)
await message.reply_chat_action(ChatAction.TYPING)
headers = await self.sign_system.gen_challenge_header(client.uid, validate)
if not headers:
_, challenge = await self.sign_system.get_challenge(client.uid)
if not challenge:
await message.reply_text("验证请求已过期。", allow_sending_without_reply=True)
return
sign_text = await self.sign_system.start_sign(client, headers=headers)
sign_text = await self.sign_system.start_sign(client, challenge=challenge, validate=validate)
await message.reply_text(sign_text, allow_sending_without_reply=True)
except (UserNotFoundError, CookiesNotFoundError):
logger.warning("用户 %s[%s] 账号信息未找到", user.full_name, user.id)

View File

@ -3,7 +3,7 @@ import typing
import aiohttp.typedefs
import genshin # pylint: disable=W0406
import yarl
from genshin import constants, types
from genshin import constants, types, utility
from genshin.client import routes
from genshin.utility import ds
@ -12,6 +12,7 @@ from utils.patch.methods import patch, patchable
DEVICE_ID = get_device_id()
@patch(genshin.client.components.calculator.CalculatorClient) # noqa
class CalculatorClient:
@patchable
@ -155,3 +156,78 @@ class BaseClient:
await self.cache.set_static(static_cache, response)
return response
@patch(genshin.client.components.daily.DailyRewardClient) # noqa
class DailyRewardClient:
@patchable
async def request_daily_reward(
self,
endpoint: str,
*,
game: typing.Optional[types.Game] = None,
method: str = "GET",
lang: typing.Optional[str] = None,
params: typing.Optional[typing.Mapping[str, typing.Any]] = None,
headers: typing.Optional[aiohttp.typedefs.LooseHeaders] = None,
**kwargs: typing.Any,
) -> typing.Mapping[str, typing.Any]:
"""Make a request towards the daily reward endpoint."""
params = dict(params or {})
headers = dict(headers or {})
if game is None:
if self.default_game is None:
raise RuntimeError("No default game set.")
game = self.default_game
base_url = routes.REWARD_URL.get_url(self.region, game)
url = (base_url / endpoint).update_query(**base_url.query)
if self.region == types.Region.OVERSEAS:
params["lang"] = lang or self.lang
elif self.region == types.Region.CHINESE:
# TODO: Support cn honkai
player_id = await self._get_uid(types.Game.GENSHIN)
params["uid"] = player_id
params["region"] = utility.recognize_genshin_server(player_id)
account_id = self.cookie_manager.get_user_id()
if account_id:
device_id = hex_digest(str(account_id))
else:
device_id = DEVICE_ID
if endpoint == "sign":
_app_version, _client_type, _ds = get_ds()
else:
_app_version, _client_type, _ds = get_ds(new_ds=True, params=params)
ua = get_ua(device="Paimon Build " + device_id[0:5], version=_app_version)
headers["User-Agent"] = ua
headers["X_Requested_With"] = "com.mihoyo.hoyolab"
headers["Referer"] = (
"https://webstatic.mihoyo.com/bbs/event/signin-ys/index.html?"
"bbs_auth_required=true&act_id=e202009291139501&utm_source=bbs&utm_medium=mys&utm_campaign=icon"
)
headers["x-rpc-device_id"] = get_device_id(ua)
headers["x-rpc-app_version"] = _app_version
headers["x-rpc-client_type"] = _client_type
headers["ds"] = _ds
validate = kwargs.get("validate")
challenge = kwargs.get("challenge")
if validate and challenge:
headers["x-rpc-challenge"] = challenge
headers["x-rpc-validate"] = validate
headers["x-rpc-seccode"] = f"{validate}|jordan"
kwargs.pop("challenge", None)
kwargs.pop("validate", None)
else:
raise TypeError(f"{self.region!r} is not a valid region.")
return await self.request(url, method=method, params=params, headers=headers, **kwargs)