♻ Refactor ApiHelper

This commit is contained in:
洛水居室 2022-12-10 20:37:43 +08:00 committed by GitHub
parent 0ca5f0314e
commit 4b976f70d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 624 additions and 589 deletions

View File

@ -47,7 +47,7 @@ LOGGER_LOCALS_MAX_DEPTH=0
LOGGER_LOCALS_MAX_LENGTH=10
LOGGER_LOCALS_MAX_STRING=80
# 可被 logger 打印的 record 的名称(默认包含了 LOGGER_NAME
LOGGER_FILTERED_NAMES=["uvicorn","ErrorPush"]
LOGGER_FILTERED_NAMES=["uvicorn","ErrorPush","ApiHelper"]
# 超时配置 可选配置项

View File

@ -1,6 +1,6 @@
from typing import List, Optional
from modules.apihelper.hyperion import Hyperion
from modules.apihelper.client.components.hyperion import Hyperion
from .cache import GameCache

View File

@ -0,0 +1 @@

View File

View File

@ -4,6 +4,8 @@ from typing import Optional, Type
import httpx
__all__ = ("HTTPXRequest",)
class HTTPXRequest(AbstractAsyncContextManager):
def __init__(self, *args, headers=None, **kwargs):

View File

@ -3,12 +3,14 @@ from typing import Union
import httpx
from httpx import Response
from modules.apihelper.error import NetworkException, ResponseException, APIHelperTimedOut
from modules.apihelper.request.httpxrequest import HTTPXRequest
from modules.apihelper.typedefs import POST_DATA, JSON_DATA
from .httpxrequest import HTTPXRequest
from ...error import NetworkException, ResponseException, APIHelperTimedOut
from ...typedefs import POST_DATA, JSON_DATA
__all__ = ("HyperionRequest",)
class HOYORequest(HTTPXRequest):
class HyperionRequest(HTTPXRequest):
async def get(
self, url: str, *args, de_json: bool = True, re_json_data: bool = False, **kwargs
) -> Union[POST_DATA, JSON_DATA, Response]:

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,45 @@
import time
from typing import List
import httpx
from pydantic import parse_obj_as
from ...models.genshin.abyss import TeamRateResult, TeamRate
__all__ = ("AbyssTeam",)
class AbyssTeam:
TEAM_RATE_API = "https://www.youchuang.fun/gamerole/formationRate"
HEADERS = {
"Host": "www.youchuang.fun",
"Referer": "https://servicewechat.com/wxce4dbe0cb0f764b3/91/page-frame.html",
"User-Agent": "Mozilla/5.0 (iPad; CPU OS 15_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Mobile/15E148 MicroMessenger/8.0.20(0x1800142f) NetType/WIFI Language/zh_CN",
"content-type": "application/json",
}
VERSION = "3.2"
def __init__(self):
self.client = httpx.AsyncClient(headers=self.HEADERS)
self.time = 0
self.data = None
self.ttl = 10 * 60
async def get_data(self) -> TeamRateResult:
if self.data is None or self.time + self.ttl < time.time():
data_up = await self.client.post(self.TEAM_RATE_API, json={"version": self.VERSION, "layer": 1})
data_up_json = data_up.json()["result"]
data_down = await self.client.post(self.TEAM_RATE_API, json={"version": self.VERSION, "layer": 2})
data_down_json = data_down.json()["result"]
self.data = TeamRateResult(
version=self.VERSION,
rate_list_up=parse_obj_as(List[TeamRate], data_up_json["rateList"]),
rate_list_down=parse_obj_as(List[TeamRate], data_down_json["rateList"]),
user_count=data_up_json["userCount"],
)
self.time = time.time()
return self.data.copy(deep=True)
async def close(self):
await self.client.aclose()

View File

@ -0,0 +1,48 @@
import time
from typing import List
from ..base.hyperionrequest import HyperionRequest
from ...models.genshin.gacha import GachaInfo
__all__ = ("Gacha",)
class Gacha:
GACHA_LIST_URL = "https://webstatic.mihoyo.com/hk4e/gacha_info/cn_gf01/gacha/list.json"
GACHA_INFO_URL = "https://webstatic.mihoyo.com/hk4e/gacha_info/cn_gf01/%s/zh-cn.json"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/90.0.4430.72 Safari/537.36"
)
def __init__(self):
self.headers = {
"User-Agent": self.USER_AGENT,
}
self.client = HyperionRequest(headers=self.headers)
self.cache = {}
self.cache_ttl = 600
async def get_gacha_list_info(self) -> List[GachaInfo]:
if self.cache.get("time", 0) + self.cache_ttl < time.time():
self.cache.clear()
cache = self.cache.get("gacha_list_info")
if cache is not None:
return cache
req = await self.client.get(self.GACHA_LIST_URL)
data = [GachaInfo(**i) for i in req["list"]]
self.cache["gacha_list_info"] = data
self.cache["time"] = time.time()
return data
async def get_gacha_info(self, gacha_id: str) -> dict:
cache = self.cache.get(gacha_id)
if cache is not None:
return cache
req = await self.client.get(self.GACHA_INFO_URL % gacha_id)
self.cache[gacha_id] = req
return req
async def close(self):
await self.client.shutdown()

View File

@ -0,0 +1,134 @@
import asyncio
import re
from typing import List
from ..base.hyperionrequest import HyperionRequest
from ...models.genshin.hyperion import PostInfo, ArtworkImage
from ...typedefs import JSON_DATA
__all__ = ("Hyperion",)
class Hyperion:
"""米忽悠bbs相关API请求
该名称来源于米忽悠的安卓BBS包名结尾考虑到大部分重要的功能确实是在移动端实现了
"""
POST_FULL_URL = "https://bbs-api.miyoushe.com/post/wapi/getPostFull"
POST_FULL_IN_COLLECTION_URL = "https://bbs-api.miyoushe.com/post/wapi/getPostFullInCollection"
GET_NEW_LIST_URL = "https://bbs-api.miyoushe.com/post/wapi/getNewsList"
GET_OFFICIAL_RECOMMENDED_POSTS_URL = "https://bbs-api.miyoushe.com/post/wapi/getOfficialRecommendedPosts"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/90.0.4430.72 Safari/537.36"
)
def __init__(self):
self.client = HyperionRequest(headers=self.get_headers())
@staticmethod
def extract_post_id(text: str) -> int:
"""
:param text:
# https://bbs.mihoyo.com/ys/article/8808224
# https://m.bbs.mihoyo.com/ys/article/8808224
# https://www.miyoushe.com/ys/article/32497914
# https://m.miyoushe.com/ys/#/article/32497914
:return: post_id
"""
rgx = re.compile(r"(?:bbs|www\.)?(?:miyoushe|mihoyo)\.com/[^.]+/article/(?P<article_id>\d+)")
matches = rgx.search(text)
if matches is None:
return -1
entries = matches.groupdict()
if entries is None:
return -1
try:
art_id = int(entries.get("article_id"))
except (IndexError, ValueError, TypeError):
return -1
return art_id
def get_headers(self, referer: str = "https://www.miyoushe.com/ys/"):
return {"User-Agent": self.USER_AGENT, "Referer": referer}
@staticmethod
def get_list_url_params(forum_id: int, is_good: bool = False, is_hot: bool = False, page_size: int = 20) -> dict:
return {
"forum_id": forum_id,
"gids": 2,
"is_good": is_good,
"is_hot": is_hot,
"page_size": page_size,
"sort_type": 1,
}
@staticmethod
def get_images_params(
resize: int = 600, quality: int = 80, auto_orient: int = 0, interlace: int = 1, images_format: str = "jpg"
):
"""
image/resize,s_600/quality,q_80/auto-orient,0/interlace,1/format,jpg
:param resize: 图片大小
:param quality: 图片质量
:param auto_orient: 自适应
:param interlace: 未知
:param images_format: 图片格式
:return:
"""
params = (
f"image/resize,s_{resize}/quality,q_{quality}/auto-orient,"
f"{auto_orient}/interlace,{interlace}/format,{images_format}"
)
return {"x-oss-process": params}
async def get_official_recommended_posts(self, gids: int) -> JSON_DATA:
params = {"gids": gids}
response = await self.client.get(url=self.GET_OFFICIAL_RECOMMENDED_POSTS_URL, params=params)
return response
async def get_post_full_in_collection(self, collection_id: int, gids: int = 2, order_type=1) -> JSON_DATA:
params = {"collection_id": collection_id, "gids": gids, "order_type": order_type}
response = await self.client.get(url=self.POST_FULL_IN_COLLECTION_URL, params=params)
return response
async def get_post_info(self, gids: int, post_id: int, read: int = 1) -> PostInfo:
params = {"gids": gids, "post_id": post_id, "read": read}
response = await self.client.get(self.POST_FULL_URL, params=params)
return PostInfo.paste_data(response)
async def get_images_by_post_id(self, gids: int, post_id: int) -> List[ArtworkImage]:
post_info = await self.get_post_info(gids, post_id)
art_list = []
task_list = [
self.download_image(post_info.post_id, post_info.image_urls[page], page)
for page in range(len(post_info.image_urls))
]
result_list = await asyncio.gather(*task_list)
for result in result_list:
if isinstance(result, ArtworkImage):
art_list.append(result)
def take_page(elem: ArtworkImage):
return elem.page
art_list.sort(key=take_page)
return art_list
async def download_image(self, art_id: int, url: str, page: int = 0) -> ArtworkImage:
response = await self.client.get(url, params=self.get_images_params(resize=2000), timeout=10, de_json=False)
return ArtworkImage(art_id=art_id, page=page, data=response.content)
async def get_new_list(self, gids: int, type_id: int, page_size: int = 20):
"""
?gids=2&page_size=20&type=3
:return:
"""
params = {"gids": gids, "page_size": page_size, "type": type_id}
response = await self.client.get(url=self.GET_NEW_LIST_URL, params=params)
return response
async def close(self):
await self.client.shutdown()

View File

@ -0,0 +1,149 @@
from typing import Dict
from httpx import AsyncClient
from ...utility.helpers import get_device_id
__all__ = ("SignIn",)
class SignIn:
LOGIN_URL = "https://webapi.account.mihoyo.com/Api/login_by_mobilecaptcha"
S_TOKEN_URL = (
"https://api-takumi.mihoyo.com/auth/api/getMultiTokenByLoginTicket?login_ticket={0}&token_types=3&uid={1}"
)
BBS_URL = "https://api-takumi.mihoyo.com/account/auth/api/webLoginByMobile"
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15"
)
HEADERS = {
"Host": "webapi.account.mihoyo.com",
"Connection": "keep-alive",
"sec-ch-ua": '".Not/A)Brand";v="99", "Microsoft Edge";v="103", "Chromium";v="103"',
"DNT": "1",
"x-rpc-device_model": "OS X 10.15.7",
"sec-ch-ua-mobile": "?0",
"User-Agent": USER_AGENT,
"x-rpc-device_id": get_device_id(USER_AGENT),
"Accept": "application/json, text/plain, */*",
"x-rpc-device_name": "Microsoft Edge 103.0.1264.62",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"x-rpc-client_type": "4",
"sec-ch-ua-platform": '"macOS"',
"Origin": "https://user.mihoyo.com",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://user.mihoyo.com/",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
}
BBS_HEADERS = {
"Host": "api-takumi.mihoyo.com",
"Content-Type": "application/json;charset=utf-8",
"Origin": "https://bbs.mihoyo.com",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"User-Agent": USER_AGENT,
"Referer": "https://bbs.mihoyo.com/",
"Accept-Language": "zh-CN,zh-Hans;q=0.9",
}
AUTHKEY_API = "https://api-takumi.mihoyo.com/binding/api/genAuthKey"
USER_INFO_API = "https://bbs-api.mihoyo.com/user/wapi/getUserFullInfo"
GACHA_HEADERS = {
"User-Agent": "okhttp/4.8.0",
"x-rpc-app_version": "2.28.1",
"x-rpc-sys_version": "12",
"x-rpc-client_type": "5",
"x-rpc-channel": "mihoyo",
"x-rpc-device_id": get_device_id(USER_AGENT),
"x-rpc-device_name": "Mi 10",
"x-rpc-device_model": "Mi 10",
"Referer": "https://app.mihoyo.com",
"Host": "api-takumi.mihoyo.com",
}
def __init__(self, phone: int = 0, uid: int = 0, cookie: Dict = None):
self.phone = phone
self.client = AsyncClient()
self.uid = uid
self.cookie = cookie if cookie is not None else {}
self.parse_uid()
def parse_uid(self):
"""
从cookie中获取uid
:param self:
:return:
"""
if not self.cookie:
return
for item in ["login_uid", "stuid", "ltuid", "account_id"]:
if item in self.cookie:
self.uid = self.cookie[item]
break
for item in ["login_uid", "stuid", "ltuid", "account_id"]:
self.cookie[item] = self.uid
@staticmethod
def check_error(data: dict) -> bool:
"""
检查是否有错误
:param data:
:return:
"""
res_data = data.get("data", {})
return res_data.get("msg") == "验证码错误" or res_data.get("info") == "Captcha not match Err"
async def login(self, captcha: int) -> bool:
data = await self.client.post(
self.LOGIN_URL,
data={"mobile": str(self.phone), "mobile_captcha": str(captcha), "source": "user.mihoyo.com"},
headers=self.HEADERS,
)
res_json = data.json()
if self.check_error(res_json):
return False
for k, v in data.cookies.items():
self.cookie[k] = v
if "login_ticket" not in self.cookie:
return False
self.parse_uid()
return bool(self.uid)
async def get_s_token(self):
if not self.cookie.get("login_ticket") or not self.uid:
return
data = await self.client.get(
self.S_TOKEN_URL.format(self.cookie["login_ticket"], self.uid), headers={"User-Agent": self.USER_AGENT}
)
res_json = data.json()
res_data = res_json.get("data", {}).get("list", [])
for i in res_data:
if i.get("name") and i.get("token"):
self.cookie[i.get("name")] = i.get("token")
async def get_token(self, captcha: int) -> bool:
data = await self.client.post(
self.BBS_URL,
headers=self.BBS_HEADERS,
json={
"is_bh2": False,
"mobile": str(self.phone),
"captcha": str(captcha),
"action_type": "login",
"token_type": 6,
},
)
res_json = data.json()
if self.check_error(res_json):
return False
for k, v in data.cookies.items():
self.cookie[k] = v
return "cookie_token" in self.cookie or "cookie_token_v2" in self.cookie

View File

@ -0,0 +1,90 @@
import json
import re
import time
from typing import Dict, Optional
from ..base.hyperionrequest import HyperionRequest
from ...utility.helpers import get_ua, get_device_id, get_ds
__all__ = ("Verify",)
class Verify:
HOST = "api-takumi-record.mihoyo.com"
VERIFICATION_HOST = "api.geetest.com"
CREATE_VERIFICATION_URL = "/game_record/app/card/wapi/createVerification"
VERIFY_VERIFICATION_URL = "/game_record/app/card/wapi/verifyVerification"
AJAX_URL = "/ajax.php"
USER_AGENT = get_ua()
BBS_HEADERS = {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": USER_AGENT,
"X-Requested-With": "com.mihoyo.hyperion",
"Referer": "https://webstatic.mihoyo.com/",
"x-rpc-device_id": get_device_id(USER_AGENT),
"x-rpc-page": "3.1.3_#/ys",
}
VERIFICATION_HEADERS = {
"Accept": "*/*",
"X-Requested-With": "com.mihoyo.hyperion",
"User-Agent": USER_AGENT,
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
}
def __init__(self, cookies: Dict = None):
self.client = HyperionRequest(headers=self.BBS_HEADERS, cookies=cookies)
def get_verification_headers(self, referer: str):
headers = self.VERIFICATION_HEADERS.copy()
headers["Referer"] = referer
return headers
def get_headers(self, data: dict = None, params: dict = None):
headers = self.BBS_HEADERS.copy()
app_version, client_type, ds = get_ds(new_ds=True, data=data, params=params)
headers["x-rpc-app_version"] = app_version
headers["x-rpc-client_type"] = client_type
headers["DS"] = ds
return headers
@staticmethod
def get_url(host: str, url: str):
return f"https://{host}{url}"
async def create(self, is_high: bool = False):
url = self.get_url(self.HOST, self.CREATE_VERIFICATION_URL)
params = {"is_high": "true" if is_high else "false"}
headers = self.get_headers(params=params)
response = await self.client.get(url, params=params, headers=headers)
return response
async def verify(self, challenge: str, validate: str):
url = self.get_url(self.HOST, self.VERIFY_VERIFICATION_URL)
data = {"geetest_challenge": challenge, "geetest_validate": validate, "geetest_seccode": f"{validate}|jordan"}
headers = self.get_headers(data=data)
response = await self.client.post(url, json=data, headers=headers)
return response
async def ajax(self, referer: str, gt: str, challenge: str) -> Optional[str]:
headers = self.get_verification_headers(referer)
url = self.get_url(self.VERIFICATION_HOST, self.AJAX_URL)
params = {
"gt": gt,
"challenge": challenge,
"lang": "zh-cn",
"pt": 3,
"client_type": "web_mobile",
"callback": f"geetest_{int(time.time() * 1000)}",
}
response = await self.client.get(url, headers=headers, params=params, de_json=False)
text = response.text
json_data = re.findall(r"^.*?\((\{.*?)\)$", text)[0]
data = json.loads(json_data)
if "success" in data["status"] and "success" in data["data"]["result"]:
return data["data"]["validate"]
return None

View File

@ -1,461 +0,0 @@
import asyncio
import json
import re
import time
from datetime import datetime
from json import JSONDecodeError
from typing import List, Optional, Dict
from genshin import Client, InvalidCookies
from genshin.utility.ds import generate_dynamic_secret
from genshin.utility.uid import recognize_genshin_server
from httpx import AsyncClient
from pydantic import BaseModel, validator
from modules.apihelper.base import ArtworkImage, PostInfo
from modules.apihelper.helpers import get_device_id, get_ds, get_ua
from modules.apihelper.request.hoyorequest import HOYORequest
from utils.typedefs import JSONDict
class Hyperion:
"""米忽悠bbs相关API请求
该名称来源于米忽悠的安卓BBS包名结尾考虑到大部分重要的功能确实是在移动端实现了
"""
POST_FULL_URL = "https://bbs-api.mihoyo.com/post/wapi/getPostFull"
POST_FULL_IN_COLLECTION_URL = "https://bbs-api.mihoyo.com/post/wapi/getPostFullInCollection"
GET_NEW_LIST_URL = "https://bbs-api.mihoyo.com/post/wapi/getNewsList"
GET_OFFICIAL_RECOMMENDED_POSTS_URL = "https://bbs-api.mihoyo.com/post/wapi/getOfficialRecommendedPosts"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/90.0.4430.72 Safari/537.36"
)
def __init__(self):
self.client = HOYORequest(headers=self.get_headers())
@staticmethod
def extract_post_id(text: str) -> int:
"""
:param text:
# https://bbs.mihoyo.com/ys/article/8808224
# https://m.bbs.mihoyo.com/ys/article/8808224
:return: post_id
"""
rgx = re.compile(r"(?:bbs\.)?mihoyo\.com/[^.]+/article/(?P<article_id>\d+)")
matches = rgx.search(text)
if matches is None:
return -1
entries = matches.groupdict()
if entries is None:
return -1
try:
art_id = int(entries.get("article_id"))
except (IndexError, ValueError, TypeError):
return -1
return art_id
def get_headers(self, referer: str = "https://bbs.mihoyo.com/"):
return {"User-Agent": self.USER_AGENT, "Referer": referer}
@staticmethod
def get_list_url_params(forum_id: int, is_good: bool = False, is_hot: bool = False, page_size: int = 20) -> dict:
return {
"forum_id": forum_id,
"gids": 2,
"is_good": is_good,
"is_hot": is_hot,
"page_size": page_size,
"sort_type": 1,
}
@staticmethod
def get_images_params(
resize: int = 600, quality: int = 80, auto_orient: int = 0, interlace: int = 1, images_format: str = "jpg"
):
"""
image/resize,s_600/quality,q_80/auto-orient,0/interlace,1/format,jpg
:param resize: 图片大小
:param quality: 图片质量
:param auto_orient: 自适应
:param interlace: 未知
:param images_format: 图片格式
:return:
"""
params = (
f"image/resize,s_{resize}/quality,q_{quality}/auto-orient,"
f"{auto_orient}/interlace,{interlace}/format,{images_format}"
)
return {"x-oss-process": params}
async def get_official_recommended_posts(self, gids: int) -> JSONDict:
params = {"gids": gids}
response = await self.client.get(url=self.GET_OFFICIAL_RECOMMENDED_POSTS_URL, params=params)
return response
async def get_post_full_in_collection(self, collection_id: int, gids: int = 2, order_type=1) -> JSONDict:
params = {"collection_id": collection_id, "gids": gids, "order_type": order_type}
response = await self.client.get(url=self.POST_FULL_IN_COLLECTION_URL, params=params)
return response
async def get_post_info(self, gids: int, post_id: int, read: int = 1) -> PostInfo:
params = {"gids": gids, "post_id": post_id, "read": read}
response = await self.client.get(self.POST_FULL_URL, params=params)
return PostInfo.paste_data(response)
async def get_images_by_post_id(self, gids: int, post_id: int) -> List[ArtworkImage]:
post_info = await self.get_post_info(gids, post_id)
art_list = []
task_list = [
self.download_image(post_info.post_id, post_info.image_urls[page], page)
for page in range(len(post_info.image_urls))
]
result_list = await asyncio.gather(*task_list)
for result in result_list:
if isinstance(result, ArtworkImage):
art_list.append(result)
def take_page(elem: ArtworkImage):
return elem.page
art_list.sort(key=take_page)
return art_list
async def download_image(self, art_id: int, url: str, page: int = 0) -> ArtworkImage:
response = await self.client.get(url, params=self.get_images_params(resize=2000), timeout=10, de_json=False)
return ArtworkImage(art_id=art_id, page=page, data=response.content)
async def get_new_list(self, gids: int, type_id: int, page_size: int = 20):
"""
?gids=2&page_size=20&type=3
:return:
"""
params = {"gids": gids, "page_size": page_size, "type": type_id}
response = await self.client.get(url=self.GET_NEW_LIST_URL, params=params)
return response
async def close(self):
await self.client.shutdown()
class GachaInfoObject(BaseModel):
begin_time: datetime
end_time: datetime
gacha_id: str
gacha_name: str
gacha_type: int
@validator("begin_time", "end_time", pre=True, allow_reuse=True)
def validate_time(cls, v):
return datetime.strptime(v, "%Y-%m-%d %H:%M:%S")
class GachaInfo:
GACHA_LIST_URL = "https://webstatic.mihoyo.com/hk4e/gacha_info/cn_gf01/gacha/list.json"
GACHA_INFO_URL = "https://webstatic.mihoyo.com/hk4e/gacha_info/cn_gf01/%s/zh-cn.json"
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/90.0.4430.72 Safari/537.36"
)
def __init__(self):
self.headers = {
"User-Agent": self.USER_AGENT,
}
self.client = HOYORequest(headers=self.headers)
self.cache = {}
self.cache_ttl = 600
async def get_gacha_list_info(self) -> List[GachaInfoObject]:
if self.cache.get("time", 0) + self.cache_ttl < time.time():
self.cache.clear()
cache = self.cache.get("gacha_list_info")
if cache is not None:
return cache
req = await self.client.get(self.GACHA_LIST_URL)
data = [GachaInfoObject(**i) for i in req["list"]]
self.cache["gacha_list_info"] = data
self.cache["time"] = time.time()
return data
async def get_gacha_info(self, gacha_id: str) -> dict:
cache = self.cache.get(gacha_id)
if cache is not None:
return cache
req = await self.client.get(self.GACHA_INFO_URL % gacha_id)
self.cache[gacha_id] = req
return req
async def close(self):
await self.client.shutdown()
class SignIn:
LOGIN_URL = "https://webapi.account.mihoyo.com/Api/login_by_mobilecaptcha"
S_TOKEN_URL = (
"https://api-takumi.mihoyo.com/auth/api/getMultiTokenByLoginTicket?login_ticket={0}&token_types=3&uid={1}"
)
BBS_URL = "https://api-takumi.mihoyo.com/account/auth/api/webLoginByMobile"
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15"
)
HEADERS = {
"Host": "webapi.account.mihoyo.com",
"Connection": "keep-alive",
"sec-ch-ua": '".Not/A)Brand";v="99", "Microsoft Edge";v="103", "Chromium";v="103"',
"DNT": "1",
"x-rpc-device_model": "OS X 10.15.7",
"sec-ch-ua-mobile": "?0",
"User-Agent": USER_AGENT,
"x-rpc-device_id": get_device_id(USER_AGENT),
"Accept": "application/json, text/plain, */*",
"x-rpc-device_name": "Microsoft Edge 103.0.1264.62",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"x-rpc-client_type": "4",
"sec-ch-ua-platform": '"macOS"',
"Origin": "https://user.mihoyo.com",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://user.mihoyo.com/",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
}
BBS_HEADERS = {
"Host": "api-takumi.mihoyo.com",
"Content-Type": "application/json;charset=utf-8",
"Origin": "https://bbs.mihoyo.com",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"User-Agent": USER_AGENT,
"Referer": "https://bbs.mihoyo.com/",
"Accept-Language": "zh-CN,zh-Hans;q=0.9",
}
AUTHKEY_API = "https://api-takumi.mihoyo.com/binding/api/genAuthKey"
USER_INFO_API = "https://bbs-api.mihoyo.com/user/wapi/getUserFullInfo"
GACHA_HEADERS = {
"User-Agent": "okhttp/4.8.0",
"x-rpc-app_version": "2.28.1",
"x-rpc-sys_version": "12",
"x-rpc-client_type": "5",
"x-rpc-channel": "mihoyo",
"x-rpc-device_id": get_device_id(USER_AGENT),
"x-rpc-device_name": "Mi 10",
"x-rpc-device_model": "Mi 10",
"Referer": "https://app.mihoyo.com",
"Host": "api-takumi.mihoyo.com",
}
def __init__(self, phone: int = 0, uid: int = 0, cookie: Dict = None):
self.phone = phone
self.client = AsyncClient()
self.uid = uid
self.cookie = cookie if cookie is not None else {}
self.parse_uid()
def parse_uid(self):
"""
从cookie中获取uid
:param self:
:return:
"""
if not self.cookie:
return
for item in ["login_uid", "stuid", "ltuid", "account_id"]:
if item in self.cookie:
self.uid = self.cookie[item]
break
for item in ["login_uid", "stuid", "ltuid", "account_id"]:
self.cookie[item] = self.uid
@staticmethod
def check_error(data: dict) -> bool:
"""
检查是否有错误
:param data:
:return:
"""
res_data = data.get("data", {})
return res_data.get("msg") == "验证码错误" or res_data.get("info") == "Captcha not match Err"
async def login(self, captcha: int) -> bool:
data = await self.client.post(
self.LOGIN_URL,
data={"mobile": str(self.phone), "mobile_captcha": str(captcha), "source": "user.mihoyo.com"},
headers=self.HEADERS,
)
res_json = data.json()
if self.check_error(res_json):
return False
for k, v in data.cookies.items():
self.cookie[k] = v
if "login_ticket" not in self.cookie:
return False
self.parse_uid()
return bool(self.uid)
async def get_s_token(self):
if not self.cookie.get("login_ticket") or not self.uid:
return
data = await self.client.get(
self.S_TOKEN_URL.format(self.cookie["login_ticket"], self.uid), headers={"User-Agent": self.USER_AGENT}
)
res_json = data.json()
res_data = res_json.get("data", {}).get("list", [])
for i in res_data:
if i.get("name") and i.get("token"):
self.cookie[i.get("name")] = i.get("token")
async def get_token(self, captcha: int) -> bool:
data = await self.client.post(
self.BBS_URL,
headers=self.BBS_HEADERS,
json={
"is_bh2": False,
"mobile": str(self.phone),
"captcha": str(captcha),
"action_type": "login",
"token_type": 6,
},
)
res_json = data.json()
if self.check_error(res_json):
return False
for k, v in data.cookies.items():
self.cookie[k] = v
return "cookie_token" in self.cookie or "cookie_token_v2" in self.cookie
@staticmethod
async def get_authkey_by_stoken(client: Client) -> Optional[str]:
"""通过 stoken 获取 authkey"""
try:
headers = SignIn.GACHA_HEADERS.copy()
headers["DS"] = generate_dynamic_secret("ulInCDohgEs557j0VsPDYnQaaz6KJcv5")
data = await client.cookie_manager.request(
SignIn.AUTHKEY_API,
method="POST",
json={
"auth_appid": "webview_gacha",
"game_biz": "hk4e_cn",
"game_uid": client.uid,
"region": recognize_genshin_server(client.uid),
},
headers=headers,
)
return data.get("authkey")
except JSONDecodeError:
pass
except InvalidCookies:
pass
return None
@staticmethod
async def get_v2_account_id(client: Client) -> Optional[int]:
"""获取 v2 account_id"""
try:
headers = SignIn.GACHA_HEADERS.copy()
headers["DS"] = generate_dynamic_secret("ulInCDohgEs557j0VsPDYnQaaz6KJcv5")
data = await client.cookie_manager.request(
SignIn.USER_INFO_API,
method="GET",
params={"gids": "2"},
headers=headers,
)
uid = data.get("user_info", {}).get("uid", None)
if uid:
uid = int(uid)
return uid
except JSONDecodeError:
pass
return None
class Verification:
HOST = "api-takumi-record.mihoyo.com"
VERIFICATION_HOST = "api.geetest.com"
CREATE_VERIFICATION_URL = "/game_record/app/card/wapi/createVerification"
VERIFY_VERIFICATION_URL = "/game_record/app/card/wapi/verifyVerification"
AJAX_URL = "/ajax.php"
USER_AGENT = get_ua()
BBS_HEADERS = {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": USER_AGENT,
"X-Requested-With": "com.mihoyo.hyperion",
"Referer": "https://webstatic.mihoyo.com/",
"x-rpc-device_id": get_device_id(USER_AGENT),
"x-rpc-page": "3.1.3_#/ys",
}
VERIFICATION_HEADERS = {
"Accept": "*/*",
"X-Requested-With": "com.mihoyo.hyperion",
"User-Agent": USER_AGENT,
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
}
def __init__(self, cookies: Dict = None):
self.client = HOYORequest(headers=self.BBS_HEADERS, cookies=cookies)
def get_verification_headers(self, referer: str):
headers = self.VERIFICATION_HEADERS.copy()
headers["Referer"] = referer
return headers
def get_headers(self, data: dict = None, params: dict = None):
headers = self.BBS_HEADERS.copy()
app_version, client_type, ds = get_ds(new_ds=True, data=data, params=params)
headers["x-rpc-app_version"] = app_version
headers["x-rpc-client_type"] = client_type
headers["DS"] = ds
return headers
@staticmethod
def get_url(host: str, url: str):
return f"https://{host}{url}"
async def create(self, is_high: bool = False):
url = self.get_url(self.HOST, self.CREATE_VERIFICATION_URL)
params = {"is_high": "true" if is_high else "false"}
headers = self.get_headers(params=params)
response = await self.client.get(url, params=params, headers=headers)
return response
async def verify(self, challenge: str, validate: str):
url = self.get_url(self.HOST, self.VERIFY_VERIFICATION_URL)
data = {"geetest_challenge": challenge, "geetest_validate": validate, "geetest_seccode": f"{validate}|jordan"}
headers = self.get_headers(data=data)
response = await self.client.post(url, json=data, headers=headers)
return response
async def ajax(self, referer: str, gt: str, challenge: str) -> Optional[str]:
headers = self.get_verification_headers(referer)
url = self.get_url(self.VERIFICATION_HOST, self.AJAX_URL)
params = {
"gt": gt,
"challenge": challenge,
"lang": "zh-cn",
"pt": 3,
"client_type": "web_mobile",
"callback": f"geetest_{int(time.time() * 1000)}",
}
response = await self.client.get(url, headers=headers, params=params, de_json=False)
text = response.text
json_data = re.findall(r"^.*?\((\{.*?)\)$", text)[0]
data = json.loads(json_data)
if "success" in data["status"] and "success" in data["data"]["result"]:
return data["data"]["validate"]
return None

View File

@ -0,0 +1,3 @@
import logging
logger = logging.getLogger("ApiHelper")

View File

View File

@ -1,8 +1,8 @@
import time
from typing import List, Optional, Any
import httpx
from pydantic import BaseModel, parse_obj_as, validator
from pydantic import BaseModel, validator
__all__ = ("Member", "TeamRate", "FullTeamRate", "TeamRateResult")
class Member(BaseModel):
@ -69,39 +69,3 @@ class TeamRateResult(BaseModel):
if len(data) >= 3:
break
return data
class AbyssTeamData:
TEAM_RATE_API = "https://www.youchuang.fun/gamerole/formationRate"
HEADERS = {
"Host": "www.youchuang.fun",
"Referer": "https://servicewechat.com/wxce4dbe0cb0f764b3/91/page-frame.html",
"User-Agent": "Mozilla/5.0 (iPad; CPU OS 15_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) "
"Mobile/15E148 MicroMessenger/8.0.20(0x1800142f) NetType/WIFI Language/zh_CN",
"content-type": "application/json",
}
VERSION = "3.2"
def __init__(self):
self.client = httpx.AsyncClient(headers=self.HEADERS)
self.time = 0
self.data = None
self.ttl = 10 * 60
async def get_data(self) -> TeamRateResult:
if self.data is None or self.time + self.ttl < time.time():
data_up = await self.client.post(self.TEAM_RATE_API, json={"version": self.VERSION, "layer": 1})
data_up_json = data_up.json()["result"]
data_down = await self.client.post(self.TEAM_RATE_API, json={"version": self.VERSION, "layer": 2})
data_down_json = data_down.json()["result"]
self.data = TeamRateResult(
version=self.VERSION,
rate_list_up=parse_obj_as(List[TeamRate], data_up_json["rateList"]),
rate_list_down=parse_obj_as(List[TeamRate], data_down_json["rateList"]),
user_count=data_up_json["userCount"],
)
self.time = time.time()
return self.data.copy(deep=True)
async def close(self):
await self.client.aclose()

View File

@ -0,0 +1,17 @@
from datetime import datetime
from pydantic import BaseModel, validator
__all__ = ("GachaInfo",)
class GachaInfo(BaseModel):
begin_time: datetime
end_time: datetime
gacha_id: str
gacha_name: str
gacha_type: int
@validator("begin_time", "end_time", pre=True, allow_reuse=True)
def validate_time(cls, v):
return datetime.strptime(v, "%Y-%m-%d %H:%M:%S")

View File

@ -3,6 +3,8 @@ from typing import List, Any
from pydantic import BaseModel, PrivateAttr
__all__ = ("ArtworkImage", "PostInfo")
class ArtworkImage(BaseModel):
art_id: int

View File

@ -1,4 +1,6 @@
from typing import Dict, Any
__all__ = ("POST_DATA", "JSON_DATA")
POST_DATA = Dict[str, Any]
JSON_DATA = Dict[str, Any]

View File

View File

@ -6,6 +6,8 @@ import time
import uuid
from typing import Mapping, Any, Optional
__all__ = ("get_device_id", "hex_digest", "get_ds", "get_recognize_server", "get_ua")
RECOGNIZE_SERVER = {
"1": "cn_gf01",
"2": "cn_gf01",
@ -87,5 +89,5 @@ def get_ua(device: str = "Paimon Build", version: str = "2.36.1"):
return (
f"Mozilla/5.0 (Linux; Android 12; {device}; wv) "
"AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/103.0.5060.129 Mobile Safari/537.36 "
f"{'miHoYoBBS/'+version if version else ''}"
f"{'miHoYoBBS/' + version if version else ''}"
)

View File

@ -11,7 +11,7 @@ from core.template import TemplateService
from core.user import UserService
from core.user.error import UserNotFoundError
from metadata.shortname import roleToId
from modules.apihelper.abyss_team import AbyssTeamData
from modules.apihelper.client.components.abyss import AbyssTeam as AbyssTeamClient
from utils.decorators.error import error_callable
from utils.decorators.restricts import restricts
from utils.helpers import get_genshin_client
@ -27,7 +27,7 @@ class AbyssTeam(Plugin, BasePlugin):
self.template_service = template_service
self.user_service = user_service
self.assets_service = assets
self.team_data = AbyssTeamData()
self.team_data = AbyssTeamClient()
@handler(CommandHandler, command="abyss_team", block=False)
@handler(MessageHandler, filters=filters.Regex("^深渊推荐配队(.*)"), block=False)

View File

@ -17,7 +17,7 @@ from core.plugin import Plugin, conversation, handler
from core.user.error import UserNotFoundError
from core.user.models import User
from core.user.services import UserService
from modules.apihelper.hyperion import SignIn
from modules.apihelper.client.components.signin import SignIn
from utils.decorators.error import error_callable
from utils.decorators.restricts import restricts
from utils.log import logger
@ -299,7 +299,7 @@ class SetUserCookies(Plugin.Conversation, BasePlugin.Conversation):
if "account_mid_v2" in cookies:
logger.info("检测到用户 %s[%s] 使用 V2 Cookie 正在尝试获取 account_id", user.full_name, user.id)
if client.region == types.Region.CHINESE:
account_info = await client.get_hoyolab_user(-1)
account_info = await client.get_hoyolab_user()
account_id = account_info.hoyolab_id
add_user_command_data.cookies["account_id"] = str(account_id)
logger.success("获取用户 %s[%s] account_id[%s] 成功", user.full_name, user.id, account_id)

View File

@ -16,7 +16,8 @@ from core.plugin import Plugin, handler
from core.template import TemplateService
from metadata.genshin import AVATAR_DATA, WEAPON_DATA, avatar_to_game_id, weapon_to_game_id
from metadata.shortname import weaponToName
from modules.apihelper.hyperion import GachaInfo, GachaInfoObject
from modules.apihelper.client.components.gacha import Gacha as GachaClient
from modules.apihelper.models.genshin.gacha import GachaInfo
from modules.gacha.banner import BannerType, GachaBanner
from modules.gacha.player.info import PlayerGachaInfo
from modules.gacha.system import BannerSystem
@ -59,8 +60,8 @@ class GachaRedis:
class GachaHandle:
def __init__(self, hyperion: Optional[GachaInfo] = None):
self.hyperion = GachaInfo() if hyperion is None else hyperion
def __init__(self):
self.hyperion = GachaClient()
async def de_banner(self, gacha_id: str, gacha_type: int) -> Optional[GachaBanner]:
gacha_info = await self.hyperion.get_gacha_info(gacha_id)
@ -109,7 +110,7 @@ class GachaHandle:
banner.banner_type = BannerType.STANDARD
return banner
async def gacha_base_info(self, gacha_name: str = "角色活动", default: bool = False) -> GachaInfoObject:
async def gacha_base_info(self, gacha_name: str = "角色活动", default: bool = False) -> GachaInfo:
gacha_list_info = await self.hyperion.get_gacha_list_info()
now = datetime.now()
for gacha in gacha_list_info:
@ -145,7 +146,7 @@ class Gacha(Plugin, BasePlugin):
self._look = asyncio.Lock()
self.assets_service = assets
async def get_banner(self, gacha_base_info: GachaInfoObject):
async def get_banner(self, gacha_base_info: GachaInfo):
async with self._look:
banner = self.banner_cache.get(gacha_base_info.gacha_id)
if banner is None:
@ -202,7 +203,7 @@ class Gacha(Plugin, BasePlugin):
except GachaNotFound:
await message.reply_text("当前卡池正在替换中,请稍后重试。")
return
logger.info(f"用户 {user.full_name}[{user.id}] 抽卡模拟器命令请求 || 参数 {gacha_name}")
logger.info("用户 %s[%s] 抽卡模拟器命令请求 || 参数 %s", user.full_name, user.id, gacha_name)
# 用户数据储存和处理
await message.reply_chat_action(ChatAction.TYPING)
banner = await self.get_banner(gacha_base_info)
@ -234,10 +235,10 @@ class Gacha(Plugin, BasePlugin):
"items": [],
"wish_name": "",
}
logger.debug(f"{banner.banner_id}")
logger.debug(f"{banner.banner_type}")
logger.debug(f"{banner.rate_up_items5}")
logger.debug(f"{banner.fallback_items5_pool1}")
# logger.debug(f"{banner.banner_id}")
# logger.debug(f"{banner.banner_type}")
# logger.debug(f"{banner.rate_up_items5}")
# logger.debug(f"{banner.fallback_items5_pool1}")
if player_gacha_banner_info.wish_item_id != 0:
weapon = WEAPON_DATA.get(str(player_gacha_banner_info.wish_item_id))
if weapon is not None:

View File

@ -19,7 +19,6 @@ from core.template.models import FileType
from core.user import UserService
from core.user.error import UserNotFoundError
from metadata.scripts.paimon_moe import update_paimon_moe_zh, GACHA_LOG_PAIMON_MOE_PATH
from modules.apihelper.hyperion import SignIn
from modules.gacha_log.error import (
GachaLogInvalidAuthkey,
PaimonMoeGachaLogFileError,
@ -34,6 +33,7 @@ from utils.bot import get_args
from utils.decorators.admins import bot_admins_rights_check
from utils.decorators.error import error_callable
from utils.decorators.restricts import restricts
from utils.genshin import get_authkey_by_stoken
from utils.helpers import get_genshin_client
from utils.log import logger
from utils.models.base import RegionEnum
@ -181,7 +181,7 @@ class GachaLogPlugin(Plugin.Conversation, BasePlugin.Conversation):
lang="zh-cn",
uid=user_info.yuanshen_uid,
)
authkey = await SignIn.get_authkey_by_stoken(client)
authkey = await get_authkey_by_stoken(client)
if not authkey:
await message.reply_text(
"<b>开始导入祈愿历史记录:请通过 https://paimon.moe/wish/import 获取抽卡记录链接后发送给我"

View File

@ -26,7 +26,7 @@ from core.sign.models import Sign as SignUser, SignStatusEnum
from core.sign.services import SignServices
from core.user.error import UserNotFoundError
from core.user.services import UserService
from modules.apihelper.hyperion import Verification
from modules.apihelper.client.components.verify import Verify
from utils.bot import get_args
from utils.decorators.error import error_callable
from utils.decorators.restricts import restricts
@ -51,7 +51,7 @@ class SignSystem:
def __init__(self, redis: RedisDB):
self.cache = redis.client
self.qname = "plugin:sign:"
self.verification = Verification()
self.verify = Verify()
async def get_challenge(self, uid: int) -> Tuple[Optional[str], Optional[str]]:
data = await self.cache.get(f"{self.qname}{uid}")
@ -172,7 +172,7 @@ class SignSystem:
gt = request_daily_reward.get("gt", "")
challenge = request_daily_reward.get("challenge", "")
logger.warning("UID[%s] 触发验证码\ngt[%s]\nchallenge[%s]", client.uid, gt, challenge)
validate = await self.verification.ajax(
validate = await self.verify.ajax(
referer=self.REFERER,
gt=gt,
challenge=challenge,

View File

@ -12,8 +12,8 @@ from core.cookies.error import CookiesNotFoundError
from core.plugin import Plugin, handler
from core.user import UserService
from core.user.error import UserNotFoundError
from modules.apihelper.client.components.verify import Verify
from modules.apihelper.error import ResponseException
from modules.apihelper.hyperion import Verification
from utils.decorators.error import error_callable
from utils.decorators.restricts import restricts
from utils.helpers import get_genshin_client
@ -62,7 +62,7 @@ class VerificationPlugins(Plugin, BasePlugin):
await message.reply_text("检测到用户为UID绑定无需认证")
return
is_high: bool = False
verification = Verification(cookies=client.cookie_manager.cookies)
verification = Verify(cookies=client.cookie_manager.cookies)
if not context.args:
try:
await client.get_genshin_notes()

View File

@ -19,9 +19,9 @@ from core.baseplugin import BasePlugin
from core.bot import bot
from core.config import config
from core.plugin import Plugin, conversation, handler
from modules.apihelper.base import ArtworkImage
from modules.apihelper.client.components.hyperion import Hyperion
from modules.apihelper.error import APIHelperException
from modules.apihelper.hyperion import Hyperion
from modules.apihelper.models.genshin.hyperion import ArtworkImage
from utils.decorators.admins import bot_admins_rights_check
from utils.decorators.error import error_callable
from utils.decorators.restricts import restricts
@ -275,7 +275,7 @@ class Post(Plugin.Conversation, BasePlugin.Conversation):
name = channel_info.name
reply_keyboard.append([f"{name}"])
except KeyError as error:
logger.error("从配置文件获取频道信息发生错误,退出任务", error)
logger.error("从配置文件获取频道信息发生错误,退出任务", exc_info=error)
await message.reply_text("从配置文件获取频道信息发生错误,退出任务", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
await message.reply_text("请选择你要推送的频道", reply_markup=ReplyKeyboardMarkup(reply_keyboard, True, True))
@ -293,7 +293,7 @@ class Post(Plugin.Conversation, BasePlugin.Conversation):
if message.text == channel_info.name:
channel_id = channel_info.chat_id
except KeyError as exc:
logger.error("从配置文件获取频道信息发生错误,退出任务", exc)
logger.error("从配置文件获取频道信息发生错误,退出任务", exc_info=exc)
logger.exception(exc)
await message.reply_text("从配置文件获取频道信息发生错误,退出任务", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END

View File

@ -13,8 +13,8 @@ from core.cookies.error import CookiesNotFoundError
from core.plugin import handler, Plugin
from core.user import UserService
from core.user.error import UserNotFoundError
from modules.apihelper.client.components.verify import Verify
from modules.apihelper.error import ResponseException, APIHelperException
from modules.apihelper.hyperion import Verification
from plugins.genshin.sign import SignSystem, NeedChallenge
from plugins.genshin.verification import VerificationSystem
from utils.decorators.error import error_callable
@ -144,9 +144,9 @@ class StartPlugin(Plugin):
"如果出现频繁验证请求建议暂停使用本Bot在内的第三方工具查询功能。\n"
"在暂停使用期间依然出现频繁认证,建议修改密码以保护账号安全。"
)
verification = Verification(cookies=client.cookie_manager.cookies)
verification = Verify(cookies=client.cookie_manager.cookies)
try:
data = await verification.create(is_high=True)
data = await verification.create()
challenge = data["challenge"]
gt = data["gt"]
logger.success("用户 %s[%s] 创建验证成功\ngt:%s\nchallenge%s", user.full_name, user.id, gt, challenge)

View File

@ -12,8 +12,8 @@ from core.cookies.error import CookiesNotFoundError
from core.plugin import Plugin, handler
from core.user import UserService
from core.user.error import UserNotFoundError
from modules.apihelper.client.components.verify import Verify
from modules.apihelper.error import ResponseException
from modules.apihelper.hyperion import Verification
from plugins.genshin.verification import VerificationSystem
from utils.decorators.restricts import restricts
from utils.helpers import get_genshin_client
@ -72,7 +72,7 @@ class WebApp(Plugin):
except CookiesNotFoundError:
await message.reply_text("检测到用户为UID绑定无需认证", reply_markup=ReplyKeyboardRemove())
return
verification = Verification(cookies=client.cookie_manager.cookies)
verify = Verify(cookies=client.cookie_manager.cookies)
if validate:
_, challenge = await self.verification_system.get_challenge(client.uid)
if challenge:
@ -84,7 +84,7 @@ class WebApp(Plugin):
validate,
)
try:
await verification.verify(challenge=challenge, validate=validate)
await verify.verify(challenge=challenge, validate=validate)
logger.success("用户 %s[%s] 验证成功", user.full_name, user.id)
await message.reply_text("验证成功", reply_markup=ReplyKeyboardRemove())
except ResponseException as exc:
@ -113,7 +113,7 @@ class WebApp(Plugin):
await message.reply_text("账户正常,无需认证")
return
try:
data = await verification.create(is_high=True)
data = await verify.create(is_high=True)
challenge = data["challenge"]
gt = data["gt"]
logger.success("用户 %s[%s] 创建验证成功\ngt:%s\nchallenge%s", user.full_name, user.id, gt, challenge)

View File

@ -4,14 +4,15 @@ import pytest
import pytest_asyncio
from flaky import flaky
from modules.apihelper.abyss_team import AbyssTeamData, TeamRateResult, TeamRate, FullTeamRate
from modules.apihelper.client.components.abyss import AbyssTeam
from modules.apihelper.models.genshin.abyss import TeamRateResult, TeamRate, FullTeamRate
LOGGER = logging.getLogger(__name__)
@pytest_asyncio.fixture
async def abyss_team_data():
_abyss_team_data = AbyssTeamData()
_abyss_team_data = AbyssTeam()
yield _abyss_team_data
await _abyss_team_data.close()
@ -19,7 +20,7 @@ async def abyss_team_data():
# noinspection PyShadowingNames
@pytest.mark.asyncio
@flaky(3, 1)
async def test_abyss_team_data(abyss_team_data: AbyssTeamData):
async def test_abyss_team_data(abyss_team_data: AbyssTeam):
team_data = await abyss_team_data.get_data()
assert isinstance(team_data, TeamRateResult)
assert isinstance(team_data.rate_list_up[0], TeamRate)

View File

@ -2,7 +2,7 @@ import pytest
import pytest_asyncio
from flaky import flaky
from modules.apihelper.hyperion import Hyperion
from modules.apihelper.client.components.hyperion import Hyperion
@pytest_asyncio.fixture

View File

@ -8,8 +8,8 @@ import pytest_asyncio
from bs4 import BeautifulSoup
from flaky import flaky
from modules.apihelper.base import PostInfo
from modules.apihelper.hyperion import Hyperion
from modules.apihelper.client.components.hyperion import Hyperion
from modules.apihelper.models.genshin.hyperion import PostInfo
LOGGER = logging.getLogger(__name__)

18
utils/genshin.py Normal file
View File

@ -0,0 +1,18 @@
from typing import Optional
from genshin import Client
from genshin.utility import recognize_genshin_server
AUTHKEY_API = "https://api-takumi.mihoyo.com/binding/api/genAuthKey"
async def get_authkey_by_stoken(client: Client) -> Optional[str]:
"""通过 stoken 获取 authkey"""
json = {
"auth_appid": "webview_gacha",
"game_biz": "hk4e_cn",
"game_uid": client.uid,
"region": recognize_genshin_server(client.uid),
}
data = await client.request_bbs(AUTHKEY_API, method="POST", data=json)
return data.get("authkey")

View File

@ -3,11 +3,11 @@ import typing
import aiohttp.typedefs
import genshin # pylint: disable=W0406
import yarl
from genshin import constants, types, utility, models
from genshin import constants, types, utility
from genshin.client import routes
from genshin.utility import generate_dynamic_secret
from genshin.utility import generate_dynamic_secret, ds
from modules.apihelper.helpers import get_ds, get_ua, get_device_id, hex_digest
from modules.apihelper.utility.helpers import get_ds, get_ua, get_device_id, hex_digest
from utils.patch.methods import patch, patchable
DEVICE_ID = get_device_id()
@ -169,6 +169,61 @@ class BaseClient:
return response
@patchable
async def request_bbs(
self,
url: aiohttp.typedefs.StrOrURL,
*,
lang: typing.Optional[str] = None,
region: typing.Optional[types.Region] = None,
method: typing.Optional[str] = None,
params: typing.Optional[typing.Mapping[str, typing.Any]] = None,
data: typing.Any = None,
headers: typing.Optional[aiohttp.typedefs.LooseHeaders] = None,
**kwargs: typing.Any,
) -> typing.Mapping[str, typing.Any]:
"""Make a request any bbs endpoint."""
if lang is not None and lang not in constants.LANGS:
raise ValueError(f"{lang} is not a valid language, must be one of: " + ", ".join(constants.LANGS))
lang = lang or self.lang
region = region or self.region
url = routes.BBS_URL.get_url(region).join(yarl.URL(url))
headers = dict(headers or {})
if self.region == types.Region.CHINESE:
if self.region == types.Region.CHINESE:
account_id = self.cookie_manager.user_id
if account_id:
device_id = hex_digest(str(account_id))
else:
account_mid_v2 = get_account_mid_v2(self.cookie_manager.cookies)
if account_mid_v2:
device_id = hex_digest(account_mid_v2)
else:
device_id = DEVICE_ID
ds_sign = generate_dynamic_secret("ulInCDohgEs557j0VsPDYnQaaz6KJcv5")
ua = get_ua(device="Paimon Build " + device_id[0:5], version="2.40.0")
add_headers = {
"User-Agent": ua,
"Referer": "https://www.miyoushe.com/ys/",
"x-rpc-device_id": get_device_id(device_id),
"x-rpc-app_version": "2.40.0",
"x-rpc-client_type": "4",
"ds": ds_sign,
}
headers.update(add_headers)
elif self.region == types.Region.OVERSEAS:
headers.update(ds.get_ds_headers(data=data, params=params, region=region, lang=lang or self.lang))
headers["Referer"] = str(routes.BBS_REFERER_URL.get_url(self.region))
else:
raise TypeError(f"{region!r} is not a valid region.")
data = await self.request(url, method=method, params=params, data=data, headers=headers, **kwargs)
return data
@patch(genshin.client.components.daily.DailyRewardClient) # noqa
class DailyRewardClient:
@ -253,44 +308,3 @@ class DailyRewardClient:
kwargs.pop("validate", None)
return await self.request(url, method=method, params=params, headers=headers, **kwargs)
@patch(genshin.client.components.hoyolab.HoyolabClient) # noqa
class HoyolabClient:
@patchable
async def get_hoyolab_user(
self, hoyolab_id: int, *, lang: typing.Optional[str] = None
) -> models.PartialHoyolabUser:
"""Get a hoyolab user."""
# todo: use routes.py instead of putting full urls in methods
if self.region == types.Region.OVERSEAS:
if hoyolab_id <= 0:
raise TypeError(f"{hoyolab_id} is not a valid hoyolab id.")
url = "https://bbs-api-os.hoyolab.com/community/painter/wapi/user/full"
data = await self.request_hoyolab(url, params=dict(uid=hoyolab_id), lang=lang)
return models.FullHoyolabUser(**data["user_info"])
elif self.region == types.Region.CHINESE:
url = "https://bbs-api.mihoyo.com/user/wapi/getUserFullInfo"
account_id = self.cookie_manager.user_id
if account_id:
device_id = hex_digest(str(account_id))
else:
account_mid_v2 = get_account_mid_v2(self.cookie_manager.cookies)
if account_mid_v2:
device_id = hex_digest(account_mid_v2)
else:
device_id = DEVICE_ID
ds_sign = generate_dynamic_secret("ulInCDohgEs557j0VsPDYnQaaz6KJcv5")
ua = get_ua(device="Paimon Build " + device_id[0:5], version="2.40.0")
headers = {
"User-Agent": ua,
"Referer": "https://www.miyoushe.com/ys/",
"x-rpc-device_id": get_device_id(device_id),
"x-rpc-app_version": "2.40.0",
"x-rpc-client_type": "4",
"ds": ds_sign,
}
data = await self.request(url, method="GET", params=dict(gids=2), headers=headers)
return models.PartialHoyolabUser(**data["user_info"])
else:
raise TypeError(f"{self.region!r} is not a valid region.")