mirror of
https://github.com/PaiGramTeam/PaiGram.git
synced 2024-11-16 12:51:35 +00:00
116 lines
5.0 KiB
Python
116 lines
5.0 KiB
Python
from typing import Tuple, Optional
|
||
|
||
from simnet import Region
|
||
from simnet.client.cookies import Cookies
|
||
from simnet.errors import NeedChallenge
|
||
|
||
from core.basemodel import RegionEnum
|
||
from core.dependence.redisdb import RedisDB
|
||
from core.plugin import Plugin
|
||
from core.services.cookies import CookiesService
|
||
from core.services.players import PlayersService
|
||
from modules.apihelper.client.components.verify import Verify
|
||
from modules.apihelper.error import ResponseException, APIHelperException
|
||
from plugins.tools.genshin import PlayerNotFoundError, CookiesNotFoundError, GenshinHelper
|
||
from utils.log import logger
|
||
|
||
__all__ = ("ChallengeSystemException", "ChallengeSystem")
|
||
|
||
|
||
class ChallengeSystemException(Exception):
|
||
def __init__(self, message: str):
|
||
self.message = message
|
||
super().__init__()
|
||
|
||
|
||
class ChallengeSystem(Plugin):
|
||
def __init__(
|
||
self,
|
||
cookies_service: CookiesService,
|
||
redis: RedisDB,
|
||
genshin_helper: GenshinHelper,
|
||
player: PlayersService,
|
||
) -> None:
|
||
self.cookies_service = cookies_service
|
||
self.genshin_helper = genshin_helper
|
||
self.cache = redis.client
|
||
self.qname = "plugin:challenge:"
|
||
self.players_service = player
|
||
|
||
async def get_challenge(self, uid: int) -> Tuple[Optional[str], Optional[str]]:
|
||
data = await self.cache.get(f"{self.qname}{uid}")
|
||
if not data:
|
||
return None, None
|
||
data = data.decode("utf-8").split("|")
|
||
return data[0], data[1]
|
||
|
||
async def set_challenge(self, uid: int, gt: str, challenge: str):
|
||
await self.cache.set(f"{self.qname}{uid}", f"{gt}|{challenge}")
|
||
await self.cache.expire(f"{self.qname}{uid}", 10 * 60)
|
||
|
||
async def create_challenge(
|
||
self, user_id: int, need_verify: bool = True, ajax: bool = False
|
||
) -> Tuple[Optional[int], Optional[str], Optional[str]]:
|
||
try:
|
||
client = await self.genshin_helper.get_genshin_client(user_id)
|
||
except PlayerNotFoundError:
|
||
raise ChallengeSystemException("用户未找到")
|
||
except CookiesNotFoundError:
|
||
raise ChallengeSystemException("æ— éœ€éªŒè¯<EFBFBD>")
|
||
if client.region != Region.CHINESE:
|
||
raise ChallengeSystemException("é<EFBFBD>žæ³•ç”¨æˆ·")
|
||
if need_verify:
|
||
try:
|
||
await client.get_genshin_notes()
|
||
except NeedChallenge:
|
||
pass
|
||
else:
|
||
raise ChallengeSystemException("账户æ£å¸¸ï¼Œæ— 需验è¯<EFBFBD>")
|
||
finally:
|
||
await client.shutdown()
|
||
else:
|
||
await client.shutdown()
|
||
verify = Verify(cookies=client.cookies)
|
||
try:
|
||
data = await verify.create()
|
||
challenge = data["challenge"]
|
||
gt = data["gt"]
|
||
except ResponseException as exc:
|
||
logger.warning("用户 %s 创建验è¯<C3A8>失效 API返回 [%s]%s", user_id, exc.code, exc.message)
|
||
raise ChallengeSystemException(f"创建验è¯<EFBFBD>失败 错误信æ<C2A1>¯ä¸º [{exc.code}]{exc.message} 请ç¨<C3A7>å<EFBFBD>Žé‡<C3A9>试")
|
||
if ajax:
|
||
try:
|
||
validate = await verify.ajax(referer="https://webstatic.mihoyo.com/", gt=gt, challenge=challenge)
|
||
if validate:
|
||
await verify.verify(challenge, validate)
|
||
return client.player_id, "ajax", "ajax"
|
||
except APIHelperException as exc:
|
||
logger.warning("用户 %s ajax 验è¯<C3A8>失效 错误信æ<C2A1>¯ä¸º %s", user_id, str(exc))
|
||
logger.warning("用户 %s ajax 验è¯<C3A8>失败 é‡<C3A9>新申请验è¯<C3A8>", user_id)
|
||
return await self.create_challenge(user_id, need_verify, False)
|
||
await self.set_challenge(client.player_id, gt, challenge)
|
||
return client.player_id, gt, challenge
|
||
|
||
async def pass_challenge(self, user_id: int, validate: str, challenge: Optional[str] = None) -> bool:
|
||
player = await self.players_service.get_player(user_id)
|
||
if player is None:
|
||
raise ChallengeSystemException("用户未找到")
|
||
if player.region != RegionEnum.HYPERION:
|
||
raise ChallengeSystemException("é<EFBFBD>žæ³•ç”¨æˆ·")
|
||
cookie_model = await self.cookies_service.get(player.user_id, player.account_id, player.region)
|
||
if cookie_model is None:
|
||
raise ChallengeSystemException("æ— éœ€éªŒè¯<EFBFBD>")
|
||
if challenge is None:
|
||
_, challenge = await self.get_challenge(player.player_id)
|
||
if challenge is None:
|
||
raise ChallengeSystemException("验è¯<EFBFBD>失效 请求已ç»<C3A7>过期")
|
||
verify = Verify(cookies=Cookies(cookie_model.data))
|
||
try:
|
||
await verify.verify(challenge=challenge, validate=validate)
|
||
except ResponseException as exc:
|
||
logger.warning("用户 %s 验è¯<C3A8>失效 API返回 [%s]%s", user_id, exc.code, exc.message)
|
||
if "拼图已过期" in exc.message:
|
||
raise ChallengeSystemException("验è¯<EFBFBD>失败,拼图已过期,请ç¨<EFBFBD>å<EFBFBD>Žé‡<EFBFBD>试或更æ<EFBFBD>¢ä½¿ç”¨çŽ¯å¢ƒè¿›è¡ŒéªŒè¯<EFBFBD>")
|
||
raise ChallengeSystemException(f"验è¯<EFBFBD>失败,错误信æ<EFBFBD>¯ä¸º [{exc.code}]{exc.message},请ç¨<EFBFBD>å<EFBFBD>Žé‡<EFBFBD>试")
|
||
return True
|