PamGram/modules/apihelper/client/components/verify.py

99 lines
3.7 KiB
Python
Raw Normal View History

2022-12-10 12:37:43 +00:00
import json
import re
import time
from typing import Dict, Optional, Union
from httpx import Cookies
2022-12-10 12:37:43 +00:00
from ..base.hyperionrequest import HyperionRequest
2023-06-17 03:48:36 +00:00
from ...utility.devices import devices_methods
from ...utility.helpers import get_ua, get_ds
2022-12-10 12:37:43 +00:00
__all__ = ("Verify",)
class Verify:
HOST = "api-takumi-record.mihoyo.com"
VERIFICATION_HOST = "api.geetest.com"
CREATE_VERIFICATION_URL = "/game_record/app/card/wapi/createVerification"
VERIFY_VERIFICATION_URL = "/game_record/app/card/wapi/verifyVerification"
2023-05-17 08:07:39 +00:00
REFERER_URL = "/game_record/app/hkrpg/api/note"
GAME = "6"
2022-12-10 12:37:43 +00:00
AJAX_URL = "/ajax.php"
USER_AGENT = get_ua()
BBS_HEADERS = {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": USER_AGENT,
"X-Requested-With": "com.mihoyo.hyperion",
"Referer": "https://webstatic.mihoyo.com/",
2023-05-17 08:07:39 +00:00
"x-rpc-page": "3.1.3_#/rpg",
2022-12-10 12:37:43 +00:00
}
VERIFICATION_HEADERS = {
"Accept": "*/*",
"X-Requested-With": "com.mihoyo.hyperion",
"User-Agent": USER_AGENT,
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
}
def __init__(self, account_id: int = None, cookies: Union[Dict, Cookies] = None):
2023-05-17 08:07:39 +00:00
self.account_id = account_id
2022-12-10 12:37:43 +00:00
self.client = HyperionRequest(headers=self.BBS_HEADERS, cookies=cookies)
def get_verification_headers(self, referer: str):
headers = self.VERIFICATION_HEADERS.copy()
headers["Referer"] = referer
return headers
2023-06-17 03:48:36 +00:00
async def get_headers(self, data: dict = None, params: dict = None):
2022-12-10 12:37:43 +00:00
headers = self.BBS_HEADERS.copy()
app_version, client_type, ds = get_ds(new_ds=True, data=data, params=params)
headers["x-rpc-app_version"] = app_version
headers["x-rpc-client_type"] = client_type
headers["DS"] = ds
2023-05-17 08:07:39 +00:00
headers["x-rpc-challenge_path"] = f"https://{self.HOST}{self.REFERER_URL}"
headers["x-rpc-challenge_game"] = self.GAME
2023-06-17 03:48:36 +00:00
await devices_methods.update_device_headers(self.account_id, headers)
2022-12-10 12:37:43 +00:00
return headers
@staticmethod
def get_url(host: str, url: str):
return f"https://{host}{url}"
async def create(self, is_high: bool = False):
url = self.get_url(self.HOST, self.CREATE_VERIFICATION_URL)
params = {"is_high": "true" if is_high else "false"}
2023-06-17 03:48:36 +00:00
headers = await self.get_headers(params=params)
2022-12-10 12:37:43 +00:00
response = await self.client.get(url, params=params, headers=headers)
return response
async def verify(self, challenge: str, validate: str):
url = self.get_url(self.HOST, self.VERIFY_VERIFICATION_URL)
data = {"geetest_challenge": challenge, "geetest_validate": validate, "geetest_seccode": f"{validate}|jordan"}
2023-06-17 03:48:36 +00:00
headers = await self.get_headers(data=data)
2022-12-10 12:37:43 +00:00
response = await self.client.post(url, json=data, headers=headers)
return response
async def ajax(self, referer: str, gt: str, challenge: str) -> Optional[str]:
headers = self.get_verification_headers(referer)
url = self.get_url(self.VERIFICATION_HOST, self.AJAX_URL)
params = {
"gt": gt,
"challenge": challenge,
"lang": "zh-cn",
"pt": 3,
"client_type": "web_mobile",
"callback": f"geetest_{int(time.time() * 1000)}",
}
response = await self.client.get(url, headers=headers, params=params, de_json=False)
text = response.text
json_data = re.findall(r"^.*?\((\{.*?)\)$", text)[0]
data = json.loads(json_data)
if "success" in data["status"] and "success" in data["data"]["result"]:
return data["data"]["validate"]
return None