PamGram/plugins/tools/challenge.py
2024-05-31 10:03:19 +08:00

115 lines
5.0 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from typing import Tuple, Optional
from simnet import Region
from simnet.client.cookies import Cookies
from simnet.errors import NeedChallenge
from core.basemodel import RegionEnum
from core.dependence.redisdb import RedisDB
from core.plugin import Plugin
from core.services.cookies import CookiesService
from core.services.players import PlayersService
from modules.apihelper.client.components.verify import Verify
from modules.apihelper.error import ResponseException, APIHelperException
from plugins.tools.genshin import PlayerNotFoundError, CookiesNotFoundError, GenshinHelper
from utils.log import logger
__all__ = ("ChallengeSystemException", "ChallengeSystem")
class ChallengeSystemException(Exception):
def __init__(self, message: str):
self.message = message
super().__init__()
class ChallengeSystem(Plugin):
def __init__(
self,
cookies_service: CookiesService,
redis: RedisDB,
genshin_helper: GenshinHelper,
player: PlayersService,
) -> None:
self.cookies_service = cookies_service
self.genshin_helper = genshin_helper
self.cache = redis.client
self.qname = "plugin:challenge:"
self.players_service = player
async def get_challenge(self, uid: int) -> Tuple[Optional[str], Optional[str]]:
data = await self.cache.get(f"{self.qname}{uid}")
if not data:
return None, None
data = data.decode("utf-8").split("|")
return data[0], data[1]
async def set_challenge(self, uid: int, gt: str, challenge: str):
await self.cache.set(f"{self.qname}{uid}", f"{gt}|{challenge}")
await self.cache.expire(f"{self.qname}{uid}", 10 * 60)
async def create_challenge(
self, user_id: int, need_verify: bool = True, ajax: bool = False
) -> Tuple[Optional[int], Optional[str], Optional[str]]:
try:
client = await self.genshin_helper.get_genshin_client(user_id)
except PlayerNotFoundError:
raise ChallengeSystemException("用户未找到")
except CookiesNotFoundError:
raise ChallengeSystemException("无需验è¯<EFBFBD>")
if need_verify:
try:
await client.get_starrail_notes()
except NeedChallenge:
pass
else:
raise ChallengeSystemException("账户正常,无需验è¯<EFBFBD>")
finally:
await client.shutdown()
else:
await client.shutdown()
verify = Verify(client.account_id, cookies=client.cookies, region=client.region)
try:
data = await verify.create()
challenge = data["challenge"]
gt = data["gt"]
except ResponseException as exc:
logger.warning("用户 %s åˆå»ºéªŒè¯<C3A8>失效 APIè¿”åž [%s]%s", user_id, exc.code, exc.message)
raise ChallengeSystemException(f"åˆå»ºéªŒè¯<EFBFBD>失败 错误信æ<C2A1>¯ä¸º [{exc.code}]{exc.message} 请ç¨<C3A7>å<EFBFBD>Žé‡<C3A9>试")
if ajax:
try:
validate = await verify.ajax(referer="https://webstatic.mihoyo.com/", gt=gt, challenge=challenge)
if validate:
await verify.verify(challenge, validate)
return client.player_id, "ajax", "ajax"
except APIHelperException as exc:
logger.warning("用户 %s ajax 验è¯<C3A8>失效 错误信æ<C2A1>¯ä¸º %s", user_id, str(exc))
logger.warning("用户 %s ajax 验è¯<C3A8>失败 é‡<C3A9>æ°ç”³è¯·éªŒè¯<C3A8>", user_id)
return await self.create_challenge(user_id, need_verify, False)
await self.set_challenge(client.player_id, gt, challenge)
return client.player_id, gt, challenge
async def pass_challenge(self, user_id: int, validate: str, challenge: Optional[str] = None) -> bool:
player = await self.players_service.get_player(user_id)
if player is None:
raise ChallengeSystemException("用户未找到")
region = Region.CHINESE
if player.region == RegionEnum.HOYOLAB:
region = Region.OVERSEAS
cookie_model = await self.cookies_service.get(player.user_id, player.account_id, player.region)
if cookie_model is None:
raise ChallengeSystemException("无需验è¯<EFBFBD>")
if challenge is None:
_, challenge = await self.get_challenge(player.player_id)
if challenge is None:
raise ChallengeSystemException("验è¯<EFBFBD>失效 请æ±å·²ç»<C3A7>过期")
verify = Verify(player.account_id, cookies=Cookies(cookie_model.data), region=region)
try:
await verify.verify(challenge=challenge, validate=validate)
except ResponseException as exc:
logger.warning("用户 %s 验è¯<C3A8>失效 APIè¿”åž [%s]%s", user_id, exc.code, exc.message)
if "拼图已过期" in exc.message:
raise ChallengeSystemException("验è¯<EFBFBD>失败,æ¼å¾å·²è¿‡æœŸï¼Œè¯·ç¨<EFBFBD>å<EFBFBD>Žé‡<EFBFBD>试æˆæ´æ<EFBFBD>¢ä½¿ç”¨çŽ¯å¢ƒè¿è¡ŒéªŒè¯<EFBFBD>")
raise ChallengeSystemException(f"验è¯<EFBFBD>失败,错误信æ<EFBFBD>¯ä¸º [{exc.code}]{exc.message},请ç¨<EFBFBD>å<EFBFBD>Žé‡<EFBFBD>试")
return True