mirror of
https://github.com/PaiGramTeam/SIMNet.git
synced 2024-11-21 21:58:05 +00:00
✨ Add Base Client
This commit is contained in:
parent
8ac1d365a1
commit
90ee5725e0
342
simnet/client/base.py
Normal file
342
simnet/client/base.py
Normal file
@ -0,0 +1,342 @@
|
||||
import logging
|
||||
import uuid
|
||||
from types import TracebackType
|
||||
from typing import AsyncContextManager, Type, Optional, Any
|
||||
|
||||
from httpx import AsyncClient, TimeoutException, Response, HTTPError
|
||||
|
||||
from simnet.client.cookies import Cookies
|
||||
from simnet.client.headers import Headers
|
||||
from simnet.errors import TimedOut, NetworkError, BadRequest, raise_for_ret_code
|
||||
from simnet.utils.ds import generate_dynamic_secret
|
||||
from simnet.utils.enum_ import Region
|
||||
from simnet.utils.types import (
|
||||
RT,
|
||||
HeaderTypes,
|
||||
CookieTypes,
|
||||
RequestData,
|
||||
QueryParamTypes,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger("simnet.BaseClient")
|
||||
|
||||
|
||||
class BaseClient(AsyncContextManager["BaseClient"]):
|
||||
"""
|
||||
This is the base class for simnet clients. It provides common methods and properties for simnet clients.
|
||||
|
||||
Parameters:
|
||||
-----------
|
||||
cookies: Optional[CookieTypes]
|
||||
The cookies used for the client.
|
||||
headers: Optional[HeaderTypes]
|
||||
The headers used for the client.
|
||||
account_id: Optional[int]
|
||||
The account id used for the client.
|
||||
player_id: Optional[int]
|
||||
The player id used for the client.
|
||||
region: Region
|
||||
The region used for the client.
|
||||
lang: str
|
||||
The language used for the client.
|
||||
|
||||
Attributes:
|
||||
-----------
|
||||
cookies: Cookies
|
||||
The cookies used for the client.
|
||||
headers: Headers
|
||||
The headers used for the client.
|
||||
player_id: Optional[int]
|
||||
The player id used for the client.
|
||||
account_id: Optional[int]
|
||||
The account id used for the client.
|
||||
client: AsyncClient
|
||||
The httpx async client instance.
|
||||
region: Region
|
||||
The region used for the client.
|
||||
lang: str
|
||||
The language used for the client.
|
||||
"""
|
||||
|
||||
_device_id = str(uuid.uuid3(uuid.NAMESPACE_URL, "SIMNet"))
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cookies: Optional[CookieTypes] = None,
|
||||
headers: Optional[HeaderTypes] = None,
|
||||
account_id: Optional[int] = None,
|
||||
player_id: Optional[int] = None,
|
||||
region: Region = Region.OVERSEAS,
|
||||
lang: str = "en-us",
|
||||
) -> None:
|
||||
"""Initialize the client with the given parameters."""
|
||||
self.cookies = Cookies(cookies)
|
||||
self.headers = Headers(headers)
|
||||
self.player_id = player_id
|
||||
self.account_id = account_id
|
||||
self.client = AsyncClient(cookies=self.cookies)
|
||||
self.region = region
|
||||
self.lang = lang
|
||||
|
||||
def get_player_id(self) -> Optional[int]:
|
||||
"""Get the player id used for the client."""
|
||||
player_id = self.player_id or self.cookies.account_id
|
||||
return player_id
|
||||
|
||||
@property
|
||||
def device_name(self) -> str:
|
||||
"""Get the device name used for the client."""
|
||||
return "SIMNet Build 114514"
|
||||
|
||||
@property
|
||||
def device_id(self) -> str:
|
||||
"""Get the device id used for the client."""
|
||||
if self.account_id is not None:
|
||||
return str(uuid.uuid3(uuid.NAMESPACE_URL, str(self.account_id)))
|
||||
return self._device_id
|
||||
|
||||
@property
|
||||
def app_version(self) -> str:
|
||||
"""Get the app version used for the client."""
|
||||
if self.region == Region.CHINESE:
|
||||
return "2.46.1"
|
||||
if self.region == Region.OVERSEAS:
|
||||
return "1.5.0"
|
||||
return "null"
|
||||
|
||||
@property
|
||||
def client_type(self) -> str:
|
||||
"""Get the client type used for the client."""
|
||||
if self.region == Region.CHINESE:
|
||||
return "5"
|
||||
if self.region == Region.OVERSEAS:
|
||||
return "5"
|
||||
return "null"
|
||||
|
||||
@property
|
||||
def user_agent(self) -> str:
|
||||
"""Get the user agent used for the client."""
|
||||
if self.region == Region.CHINESE:
|
||||
return (
|
||||
f"Mozilla/5.0 (Linux; {self.device_name}) "
|
||||
"AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/111.0.5563.116 Mobile Safari/537.36 "
|
||||
f"miHoYoBBS/{self.app_version}"
|
||||
)
|
||||
return (
|
||||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||||
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.5563.116 Safari/537.36"
|
||||
)
|
||||
|
||||
async def __aenter__(self: RT) -> RT:
|
||||
"""Enter the async context manager and initialize the client."""
|
||||
try:
|
||||
await self.initialize()
|
||||
return self
|
||||
except Exception as exc:
|
||||
await self.shutdown()
|
||||
raise exc
|
||||
|
||||
async def __aexit__(
|
||||
self,
|
||||
exc_type: Optional[Type[BaseException]],
|
||||
exc_val: Optional[BaseException],
|
||||
exc_tb: Optional[TracebackType],
|
||||
) -> None:
|
||||
"""Exit the async context manager and shutdown the client."""
|
||||
await self.shutdown()
|
||||
|
||||
async def shutdown(self):
|
||||
"""Shutdown the client."""
|
||||
if self.client.is_closed:
|
||||
_LOGGER.info("This Client is already shut down. Returning.")
|
||||
return
|
||||
|
||||
await self.client.aclose()
|
||||
|
||||
async def initialize(self):
|
||||
"""Initialize the client."""
|
||||
|
||||
def get_default_header(self, header: HeaderTypes):
|
||||
"""Get the default header for API requests.
|
||||
|
||||
Args:
|
||||
header (HeaderTypes): The header to use.
|
||||
|
||||
Returns:
|
||||
Headers: The default header with added fields.
|
||||
"""
|
||||
header = Headers(header)
|
||||
header["user-agent"] = self.user_agent
|
||||
header["x-rpc-app_version"] = self.app_version
|
||||
header["x-rpc-client_type"] = self.client_type
|
||||
header["x-rpc-device_id"] = self.device_id
|
||||
return header
|
||||
|
||||
def get_lab_api_header(
|
||||
self,
|
||||
header: HeaderTypes,
|
||||
lang: Optional[str] = None,
|
||||
ds: str = None,
|
||||
ds_type: str = None,
|
||||
new_ds: bool = False,
|
||||
data: Any = None,
|
||||
params: Optional[QueryParamTypes] = None,
|
||||
):
|
||||
"""Get the lab API header for API requests.
|
||||
|
||||
Args:
|
||||
header (HeaderTypes): The header to use.
|
||||
lang (Optional[str], optional): The language to use for overseas regions. Defaults to None.
|
||||
ds (str, optional): The DS string to use. Defaults to None.
|
||||
ds_type (str, optional): The DS type to use. Defaults to None.
|
||||
new_ds (bool, optional): Whether to generate a new DS. Defaults to False.
|
||||
data (Any, optional): The data to use. Defaults to None.
|
||||
params (Optional[QueryParamTypes], optional): The query parameters to use. Defaults to None.
|
||||
Returns:
|
||||
Headers: The lab API header with added fields.
|
||||
"""
|
||||
header = Headers(header)
|
||||
header["user-agent"] = self.user_agent
|
||||
header["x-rpc-app_version"] = self.app_version
|
||||
header["x-rpc-client_type"] = self.client_type
|
||||
header["x-rpc-device_id"] = self.device_id
|
||||
if self.region == Region.OVERSEAS:
|
||||
header["x-rpc-language"] = self.lang or lang
|
||||
if ds is None:
|
||||
app_version, client_type, ds = generate_dynamic_secret(
|
||||
self.region, ds_type, new_ds, data, params
|
||||
)
|
||||
header["x-rpc-app_version"] = app_version
|
||||
header["x-rpc-client_type"] = client_type
|
||||
header["DS"] = ds
|
||||
return header
|
||||
|
||||
async def request(
|
||||
self,
|
||||
method: str,
|
||||
url: str,
|
||||
data: Optional[RequestData] = None,
|
||||
json: Optional[Any] = None,
|
||||
params: Optional[QueryParamTypes] = None,
|
||||
headers: Optional[HeaderTypes] = None,
|
||||
) -> Response:
|
||||
"""Make an HTTP request and return the response.
|
||||
|
||||
This method makes an HTTP request with the specified HTTP method, URL, request parameters, headers,
|
||||
and JSON payload. It catches common HTTP errors and raises a `NetworkError` or `TimedOut` exception
|
||||
if the request times out.
|
||||
|
||||
Args:
|
||||
method (str): The HTTP method to use for the request (e.g., "GET", "POST").
|
||||
url (str): The URL to send the request to.
|
||||
data (Optional[RequestData]): The request data to include in the body of the request.
|
||||
json (Optional[Any]): The JSON payload to include in the body of the request.
|
||||
params (Optional[QueryParamTypes]): The query parameters to include in the request.
|
||||
headers (Optional[HeaderTypes]): The headers to include in the request.
|
||||
|
||||
Returns:
|
||||
Response: A `Response` object representing the HTTP response.
|
||||
|
||||
Raises:
|
||||
NetworkError: If an HTTP error occurs while making the request.
|
||||
TimedOut: If the request times out.
|
||||
|
||||
"""
|
||||
try:
|
||||
return await self.client.request(
|
||||
method,
|
||||
url,
|
||||
data=data,
|
||||
json=json,
|
||||
params=params,
|
||||
headers=headers,
|
||||
)
|
||||
except TimeoutException as exc:
|
||||
raise TimedOut from exc
|
||||
except HTTPError as exc:
|
||||
raise NetworkError from exc
|
||||
|
||||
async def request_api(
|
||||
self,
|
||||
method: str,
|
||||
url: str,
|
||||
json: Optional[Any] = None,
|
||||
params: Optional[QueryParamTypes] = None,
|
||||
headers: Optional[HeaderTypes] = None,
|
||||
):
|
||||
"""Make an API request and return the data.
|
||||
|
||||
This method makes an API request using the `request()` method
|
||||
and returns the data from the response if it is successful.
|
||||
If the response contains an error, it raises a `BadRequest` exception.
|
||||
|
||||
Args:
|
||||
method (str): The HTTP method to use for the request (e.g., "GET", "POST").
|
||||
url (str): The URL to send the request to.
|
||||
json (Optional[Any]): The JSON payload to include in the body of the request.
|
||||
params (Optional[QueryParamTypes]): The query parameters to include in the request.
|
||||
headers (Optional[HeaderTypes]): The headers to include in the request.
|
||||
|
||||
Returns:
|
||||
Any: The data returned by the API.
|
||||
|
||||
Raises:
|
||||
NetworkError: If an HTTP error occurs while making the request.
|
||||
TimedOut: If the request times out.
|
||||
BadRequest: If the response contains an error.
|
||||
"""
|
||||
response = await self.request(
|
||||
method,
|
||||
url,
|
||||
json=json,
|
||||
params=params,
|
||||
headers=headers,
|
||||
)
|
||||
# if "application/json" in response.headers.get("Content-Type", ""):
|
||||
if not response.is_error:
|
||||
data = response.json()
|
||||
ret_code = data.get("retcode")
|
||||
if response.is_error or ret_code != 0:
|
||||
raise_for_ret_code(data)
|
||||
return data["data"]
|
||||
raise BadRequest(status_code=response.status_code, message=response.text)
|
||||
|
||||
async def request_lab(
|
||||
self,
|
||||
url: str,
|
||||
method: Optional[str] = None,
|
||||
data: Optional[Any] = None,
|
||||
params: Optional[QueryParamTypes] = None,
|
||||
headers: Optional[HeaderTypes] = None,
|
||||
lang: Optional[str] = None,
|
||||
new_ds: bool = False,
|
||||
ds_type: str = None,
|
||||
):
|
||||
"""Make a request to the lab API and return the data.
|
||||
|
||||
This method makes a request to the lab API using the `request_api()` method
|
||||
and returns the data from the response if it is successful.
|
||||
It also adds headers for the lab API and handles the case where the method is not specified.
|
||||
|
||||
Args:
|
||||
url (str): The URL to send the request to.
|
||||
method (Optional[str]): The HTTP method to use for the request (e.g., "GET", "POST").
|
||||
data (Optional[Any]): The JSON payload to include in the body of the request.
|
||||
params (Optional[QueryParamTypes]): The query parameters to include in the request.
|
||||
headers (Optional[HeaderTypes]): The headers to include in the request.
|
||||
lang (Optional[str]): The language of the request (e.g., "en", "zh").
|
||||
new_ds (bool): Whether to use a new dataset for the request.
|
||||
ds_type (str): The type of dataset to use for the request (e.g., "news", "qa").
|
||||
|
||||
Returns:
|
||||
Any: The data returned by the lab API.
|
||||
|
||||
"""
|
||||
if method is None:
|
||||
method = "POST" if data else "GET"
|
||||
headers = self.get_lab_api_header(
|
||||
headers, ds_type=ds_type, new_ds=new_ds, lang=lang, data=data, params=params
|
||||
)
|
||||
return await self.request_api(
|
||||
method=method, url=url, json=data, params=params, headers=headers
|
||||
)
|
25
simnet/client/cookies.py
Normal file
25
simnet/client/cookies.py
Normal file
@ -0,0 +1,25 @@
|
||||
from typing import Optional
|
||||
|
||||
from httpx import Cookies as _Cookies
|
||||
|
||||
|
||||
class Cookies(_Cookies):
|
||||
"""An extension of the `httpx.Cookies` class that includes additional functionality."""
|
||||
|
||||
COOKIE_USER_ID_NAMES = ("ltuid", "account_id", "ltuid_v2", "account_id_v2")
|
||||
|
||||
@property
|
||||
def account_id(self) -> Optional[int]:
|
||||
"""Return the user account ID if present in the cookies.
|
||||
|
||||
If one of the user ID cookies exists in the cookies, return its integer value.
|
||||
Otherwise, return `None`.
|
||||
|
||||
Returns:
|
||||
Optional[int]: The user account ID, or `None` if it is not present in the cookies.
|
||||
"""
|
||||
for name in self.COOKIE_USER_ID_NAMES:
|
||||
value = self.get(name)
|
||||
if value is not None:
|
||||
return int(value)
|
||||
return None
|
5
simnet/client/headers.py
Normal file
5
simnet/client/headers.py
Normal file
@ -0,0 +1,5 @@
|
||||
from httpx import Headers as _Headers
|
||||
|
||||
|
||||
class Headers(_Headers):
|
||||
"""An extension of the `httpx.Headers` class that includes additional functionality."""
|
252
simnet/errors.py
Normal file
252
simnet/errors.py
Normal file
@ -0,0 +1,252 @@
|
||||
from typing import Any, Optional, Dict, Union, Tuple, Type, NoReturn
|
||||
|
||||
|
||||
class ApiHelperException(Exception):
|
||||
"""Base class for ApiHelper errors."""
|
||||
|
||||
|
||||
class NetworkError(ApiHelperException):
|
||||
"""Base class for exceptions due to networking errors."""
|
||||
|
||||
|
||||
class TimedOut(NetworkError):
|
||||
"""Raised when a request took too long to finish."""
|
||||
|
||||
|
||||
class BadRequest(ApiHelperException):
|
||||
"""Raised when an API request cannot be processed correctly.
|
||||
|
||||
Attributes:
|
||||
status_code (int): The status code of the response.
|
||||
ret_code (int): The error code of the response.
|
||||
original (str): The original error message of the response.
|
||||
message (str): The formatted error message of the response.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
response: Optional[Dict[str, Any]] = None,
|
||||
message: Optional[str] = None,
|
||||
status_code: Optional[int] = None,
|
||||
) -> None:
|
||||
self.status_code = status_code or 0
|
||||
self.ret_code = response.get("retcode", 0) if response else 0
|
||||
self.original = response.get("message", "") if response else ""
|
||||
self.message = message or self.original
|
||||
|
||||
display_code = self.ret_code or self.status_code
|
||||
display_message = (
|
||||
f"[{display_code}] {self.message}" if display_code else self.message
|
||||
)
|
||||
|
||||
super().__init__(display_message)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
response = {
|
||||
"status_code": self.status_code,
|
||||
"retcode": self.ret_code,
|
||||
"message": self.original,
|
||||
}
|
||||
return f"{type(self).__name__}({repr(response)})"
|
||||
|
||||
@property
|
||||
def response(self) -> dict[str, Union[str, Any, None]]:
|
||||
return {"retcode": self.ret_code, "message": self.original}
|
||||
|
||||
|
||||
class InternalDatabaseError(BadRequest):
|
||||
"""Internal database error."""
|
||||
|
||||
ret_code = -1
|
||||
|
||||
|
||||
class AccountNotFound(BadRequest):
|
||||
"""Tried to get data with an invalid uid."""
|
||||
|
||||
message = "Could not find user; uid may be invalid."
|
||||
|
||||
|
||||
class DataNotPublic(BadRequest):
|
||||
"""User hasn't set their data to public."""
|
||||
|
||||
message = "User's data is not public."
|
||||
|
||||
|
||||
class CookieException(BadRequest):
|
||||
"""Base error for cookies."""
|
||||
|
||||
|
||||
class InvalidCookies(CookieException):
|
||||
"""Cookies weren't valid."""
|
||||
|
||||
ret_code = -100
|
||||
message = "Cookies are not valid."
|
||||
|
||||
|
||||
class TooManyRequests(CookieException):
|
||||
"""Made too many requests and got ratelimited."""
|
||||
|
||||
ret_code = 10101
|
||||
message = "Cannot get data for more than 30 accounts per cookie per day."
|
||||
|
||||
|
||||
class VisitsTooFrequently(BadRequest):
|
||||
"""Visited a page too frequently.
|
||||
|
||||
Must be handled with exponential backoff.
|
||||
"""
|
||||
|
||||
ret_code = -110
|
||||
message = "Visits too frequently."
|
||||
|
||||
|
||||
class AlreadyClaimed(BadRequest):
|
||||
"""Already claimed the daily reward today."""
|
||||
|
||||
ret_code = -5003
|
||||
message = "Already claimed the daily reward today."
|
||||
|
||||
|
||||
class AuthkeyException(BadRequest):
|
||||
"""Base error for authkeys."""
|
||||
|
||||
|
||||
class InvalidAuthkey(AuthkeyException):
|
||||
"""Authkey is not valid."""
|
||||
|
||||
ret_code = -100
|
||||
message = "Authkey is not valid."
|
||||
|
||||
|
||||
class AuthkeyTimeout(AuthkeyException):
|
||||
"""Authkey has timed out."""
|
||||
|
||||
ret_code = -101
|
||||
message = "Authkey has timed out."
|
||||
|
||||
|
||||
class RedemptionException(BadRequest):
|
||||
"""Exception caused by redeeming a code."""
|
||||
|
||||
|
||||
class RedemptionInvalid(RedemptionException):
|
||||
"""Invalid redemption code."""
|
||||
|
||||
message = "Invalid redemption code."
|
||||
|
||||
|
||||
class RedemptionCooldown(RedemptionException):
|
||||
"""Redemption is on cooldown."""
|
||||
|
||||
message = "Redemption is on cooldown."
|
||||
|
||||
|
||||
class RedemptionClaimed(RedemptionException):
|
||||
"""Redemption code has been claimed already."""
|
||||
|
||||
message = "Redemption code has been claimed already."
|
||||
|
||||
|
||||
_TBR = Type[BadRequest]
|
||||
_errors: Dict[int, Union[_TBR, str, Tuple[_TBR, Optional[str]]]] = {
|
||||
# misc hoyolab
|
||||
-100: InvalidCookies,
|
||||
-108: "Invalid language.",
|
||||
-110: VisitsTooFrequently,
|
||||
# game record
|
||||
10001: InvalidCookies,
|
||||
-10001: "Malformed request.",
|
||||
-10002: "No genshin account associated with cookies.",
|
||||
# database game record
|
||||
10101: TooManyRequests,
|
||||
10102: DataNotPublic,
|
||||
10103: (
|
||||
InvalidCookies,
|
||||
"Cookies are valid but do not have a hoyolab account bound to them.",
|
||||
),
|
||||
10104: "Cannot view real-time notes of other users.",
|
||||
# calculator
|
||||
-500001: "Invalid fields in calculation.",
|
||||
-500004: VisitsTooFrequently,
|
||||
-502001: "User does not have this character.",
|
||||
-502002: "Calculator sync is not enabled.",
|
||||
# mixin
|
||||
-1: InternalDatabaseError,
|
||||
1009: AccountNotFound,
|
||||
# redemption
|
||||
-1065: RedemptionInvalid,
|
||||
-1071: InvalidCookies,
|
||||
-1073: (AccountNotFound, "Account has no game account bound to it."),
|
||||
-2001: (RedemptionInvalid, "Redemption code has expired."),
|
||||
-2003: (RedemptionInvalid, "Redemption code is incorrectly formatted."),
|
||||
-2004: RedemptionInvalid,
|
||||
-2014: (RedemptionInvalid, "Redemption code not activated"),
|
||||
-2016: RedemptionCooldown,
|
||||
-2017: RedemptionClaimed,
|
||||
-2018: RedemptionClaimed,
|
||||
-2021: (
|
||||
RedemptionException,
|
||||
"Cannot claim codes for accounts with adventure rank lower than 10.",
|
||||
),
|
||||
# rewards
|
||||
-5003: AlreadyClaimed,
|
||||
# chinese
|
||||
1008: AccountNotFound,
|
||||
-1104: "This action must be done in the app.",
|
||||
}
|
||||
|
||||
|
||||
ERRORS: Dict[int, Tuple[_TBR, Optional[str]]] = {
|
||||
ret_code: (
|
||||
(exc, None)
|
||||
if isinstance(exc, type)
|
||||
else (BadRequest, exc)
|
||||
if isinstance(exc, str)
|
||||
else exc
|
||||
)
|
||||
for ret_code, exc in _errors.items()
|
||||
}
|
||||
|
||||
|
||||
def raise_for_ret_code(data: Dict[str, Any]) -> NoReturn:
|
||||
"""Raise an equivalent error to a response.
|
||||
|
||||
Args:
|
||||
data (dict): The response data.
|
||||
|
||||
Raises:
|
||||
InvalidAuthkey: If the authkey is invalid.
|
||||
AuthkeyTimeout: If the authkey has timed out.
|
||||
AuthkeyException: If there is an authkey exception.
|
||||
RedemptionException: If there is a redemption exception.
|
||||
BadRequest: If there is a bad request.
|
||||
|
||||
game record:
|
||||
10001 = invalid cookie
|
||||
101xx = generic errors
|
||||
authkey:
|
||||
-100 = invalid authkey
|
||||
-101 = authkey timed out
|
||||
code redemption:
|
||||
20xx = invalid code or state
|
||||
-107x = invalid cookies
|
||||
daily reward:
|
||||
-500x = already claimed the daily reward
|
||||
"""
|
||||
r, m = data.get("retcode", 0), data.get("message", "")
|
||||
|
||||
if m.startswith("authkey"):
|
||||
if r == -100:
|
||||
raise InvalidAuthkey(data)
|
||||
if r == -101:
|
||||
raise AuthkeyTimeout(data)
|
||||
raise AuthkeyException(data)
|
||||
|
||||
if r in ERRORS:
|
||||
exc_type, msg = ERRORS[r]
|
||||
raise exc_type(data, msg)
|
||||
|
||||
if "redemption" in m:
|
||||
raise RedemptionException(data)
|
||||
|
||||
raise BadRequest(data)
|
23
simnet/utils/cookies.py
Normal file
23
simnet/utils/cookies.py
Normal file
@ -0,0 +1,23 @@
|
||||
"""A module for parsing cookies."""
|
||||
from http.cookies import SimpleCookie
|
||||
from typing import Dict
|
||||
|
||||
|
||||
def parse_cookie(cookie: str) -> Dict[str, str]:
|
||||
"""
|
||||
Parses a cookie or header into a dictionary of key-value pairs.
|
||||
|
||||
Args:
|
||||
cookie (str): The cookie or header to parse.
|
||||
|
||||
Returns:
|
||||
Dict[str, str]: A dictionary of key-value pairs representing the parsed cookie.
|
||||
|
||||
Example:
|
||||
>>> cookie = "sessionid=123456; expires=Fri, 31-Dec-2021 23:59:59 GMT; HttpOnly; Max-Age=31449600; Path=/"
|
||||
>>> parse_cookie(cookie)
|
||||
{'sessionid': '123456'}
|
||||
"""
|
||||
cookie = SimpleCookie(cookie)
|
||||
|
||||
return {str(k): v.value for k, v in cookie.items()}
|
106
simnet/utils/ds.py
Normal file
106
simnet/utils/ds.py
Normal file
@ -0,0 +1,106 @@
|
||||
import hashlib
|
||||
import json
|
||||
import random
|
||||
import string
|
||||
import time
|
||||
from enum import Enum
|
||||
from typing import Any, Optional
|
||||
|
||||
from simnet.utils.enum_ import Region
|
||||
from simnet.utils.types import QueryParamTypes
|
||||
|
||||
|
||||
class DSType(Enum):
|
||||
"""
|
||||
Enumeration of dynamic secret types.
|
||||
|
||||
Attributes:
|
||||
ANDROID (str): Android dynamic secret type.
|
||||
ANDROID_NEW (str): New Android dynamic secret type.
|
||||
SIGN (str): Sign dynamic secret type.
|
||||
"""
|
||||
|
||||
ANDROID = "android"
|
||||
ANDROID_NEW = "android_new"
|
||||
SIGN = "sign"
|
||||
|
||||
|
||||
def hex_digest(text):
|
||||
"""
|
||||
Computes the MD5 hash digest of the given text.
|
||||
|
||||
Args:
|
||||
text (str): The text to hash.
|
||||
|
||||
Returns:
|
||||
str: The MD5 hash digest of the given text.
|
||||
"""
|
||||
_md5 = hashlib.md5() # nosec B303
|
||||
_md5.update(text.encode())
|
||||
return _md5.hexdigest()
|
||||
|
||||
|
||||
def generate_dynamic_secret(
|
||||
region: Region,
|
||||
ds_type: Optional[DSType] = None,
|
||||
new_ds: bool = False,
|
||||
data: Any = None,
|
||||
params: Optional[QueryParamTypes] = None,
|
||||
):
|
||||
"""
|
||||
Generates a dynamic secret.
|
||||
|
||||
Args:
|
||||
region (Region): The region for which to generate the dynamic secret.
|
||||
ds_type (Optional[DSType], optional): The dynamic secret type. Defaults to None.
|
||||
new_ds (bool, optional): Whether to generate a new or old dynamic secret. Defaults to False.
|
||||
data (Any, optional): The data to include in the dynamic secret. Defaults to None.
|
||||
params (Optional[QueryParamTypes], optional): The query parameters to include in the dynamic secret.
|
||||
Defaults to None.
|
||||
|
||||
Raises:
|
||||
ValueError: If the region or ds_type is not recognized.
|
||||
|
||||
Returns:
|
||||
Tuple[str, str, str]: A tuple containing the app version, client type, and dynamic secret.
|
||||
"""
|
||||
|
||||
def new():
|
||||
"""Create a new dynamic secret 2."""
|
||||
t = str(int(time.time()))
|
||||
r = str(random.randint(100001, 200000)) # nosec
|
||||
b = json.dumps(data) if data else ""
|
||||
q = "&".join(f"{k}={v}" for k, v in sorted(params.items())) if params else ""
|
||||
c = hex_digest(f"salt={salt}&t={t}&r={r}&b={b}&q={q}")
|
||||
return f"{t},{r},{c}"
|
||||
|
||||
def old():
|
||||
"""Create a new dynamic secret."""
|
||||
t = str(int(time.time()))
|
||||
r = "".join(random.sample(string.ascii_lowercase + string.digits, 6))
|
||||
c = hex_digest(f"salt={salt}&t={t}&r={r}")
|
||||
return f"{t},{r},{c}"
|
||||
|
||||
app_version = "2.46.1"
|
||||
client_type = "5"
|
||||
if region == Region.OVERSEAS:
|
||||
salt = "6s25p5ox5y14umn1p61aqyyvbvvl3lrt"
|
||||
app_version = "1.5.0"
|
||||
elif region == Region.CHINESE:
|
||||
if ds_type is None:
|
||||
salt = "xV8v4Qu54lUKrEYFZkJhB8cuOh9Asafs"
|
||||
elif ds_type == DSType.ANDROID:
|
||||
salt = "KZazpG4cO2QECFDBUCxdhS8cYCsQHfzn"
|
||||
client_type = "2"
|
||||
elif ds_type == DSType.ANDROID_NEW:
|
||||
client_type = "2"
|
||||
salt = "t0qEgfub6cvueAPgR5m9aQWWVciEer7v"
|
||||
else:
|
||||
raise ValueError(f"Unknown ds_type: {ds_type}")
|
||||
else:
|
||||
raise ValueError(f"Unknown region: {region}")
|
||||
if new_ds:
|
||||
ds = new()
|
||||
else:
|
||||
ds = old()
|
||||
return app_version, client_type, ds
|
29
simnet/utils/enum_.py
Normal file
29
simnet/utils/enum_.py
Normal file
@ -0,0 +1,29 @@
|
||||
import enum as _enum
|
||||
|
||||
|
||||
class Region(str, _enum.Enum):
|
||||
"""
|
||||
Represents a region where a game is being played.
|
||||
|
||||
Attributes:
|
||||
OVERSEAS (Region): Represents an overseas region where a game is being played.
|
||||
CHINESE (Region): Represents a Chinese region where a game is being played.
|
||||
"""
|
||||
|
||||
OVERSEAS = "os"
|
||||
CHINESE = "cn"
|
||||
|
||||
|
||||
class Game(str, _enum.Enum):
|
||||
"""
|
||||
Represents a game that can be played in different regions.
|
||||
|
||||
Attributes:
|
||||
GENSHIN (Game): Represents the game "Genshin Impact".
|
||||
HONKAI (Game): Represents the game "Honkai Impact 3rd".
|
||||
STARRAIL (Game): Represents the game "Honkai Impact 3rd RPG".
|
||||
"""
|
||||
|
||||
GENSHIN = "genshin"
|
||||
HONKAI = "honkai3rd"
|
||||
STARRAIL = "hkrpg"
|
15
simnet/utils/lang.py
Normal file
15
simnet/utils/lang.py
Normal file
@ -0,0 +1,15 @@
|
||||
def create_short_lang_code(lang: str) -> str:
|
||||
"""
|
||||
Create a short language code from a longer one.
|
||||
|
||||
This function takes a language code as input and returns a shortened version of it. If the language code contains
|
||||
"zh", the function returns the original code. Otherwise, the function returns the first part of the code (before the
|
||||
first hyphen, if there is one).
|
||||
|
||||
Args:
|
||||
lang (str): The language code to be shortened.
|
||||
|
||||
Returns:
|
||||
str: The shortened language code.
|
||||
"""
|
||||
return lang if "zh" in lang else lang.split("-")[0]
|
116
simnet/utils/player.py
Normal file
116
simnet/utils/player.py
Normal file
@ -0,0 +1,116 @@
|
||||
"""This module contains functions for recognizing servers associated with different player IDs."""
|
||||
|
||||
from typing import Optional, Mapping, Sequence
|
||||
from simnet.utils.enum_ import Game, Region
|
||||
|
||||
UID_RANGE: Mapping[Game, Mapping[Region, Sequence[int]]] = {
|
||||
Game.GENSHIN: {
|
||||
Region.OVERSEAS: (6, 7, 8, 9),
|
||||
Region.CHINESE: (1, 2, 5),
|
||||
},
|
||||
Game.STARRAIL: {
|
||||
Region.OVERSEAS: (6, 7, 8, 9),
|
||||
Region.CHINESE: (1, 2),
|
||||
},
|
||||
Game.HONKAI: {
|
||||
Region.OVERSEAS: (1, 2),
|
||||
Region.CHINESE: (3, 4),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def recognize_genshin_server(player_id: int) -> str:
|
||||
"""Recognize which server a Genshin UID is from.
|
||||
|
||||
Args:
|
||||
player_id (int): The player ID to recognize the server for.
|
||||
|
||||
Returns:
|
||||
str: The name of the server associated with the given player ID.
|
||||
|
||||
Raises:
|
||||
ValueError: If the player ID is not associated with any server.
|
||||
"""
|
||||
server = {
|
||||
"1": "cn_gf01",
|
||||
"2": "cn_gf01",
|
||||
"5": "cn_qd01",
|
||||
"6": "os_usa",
|
||||
"7": "os_euro",
|
||||
"8": "os_asia",
|
||||
"9": "os_cht",
|
||||
}.get(str(player_id)[0])
|
||||
|
||||
if server:
|
||||
return server
|
||||
|
||||
raise ValueError(f"Player id {player_id} isn't associated with any server")
|
||||
|
||||
|
||||
def recognize_starrail_server(player_id: int) -> str:
|
||||
"""Recognize which server a StarRail UID is from.
|
||||
|
||||
Args:
|
||||
player_id (int): The player ID to recognize the server for.
|
||||
|
||||
Returns:
|
||||
str: The name of the server associated with the given player ID.
|
||||
|
||||
Raises:
|
||||
ValueError: If the player ID is not associated with any server.
|
||||
"""
|
||||
server = {
|
||||
"1": "prod_gf_cn",
|
||||
"2": "prod_gf_cn",
|
||||
"5": "",
|
||||
"6": "",
|
||||
"7": "",
|
||||
"8": "",
|
||||
"9": "",
|
||||
}.get(str(player_id)[0])
|
||||
|
||||
if server:
|
||||
return server
|
||||
|
||||
raise ValueError(f"player id {player_id} isn't associated with any server")
|
||||
|
||||
|
||||
def recognize_region(player_id: int, game: Game) -> Optional[Region]:
|
||||
"""
|
||||
Recognizes the region of a player ID for a given game.
|
||||
|
||||
Args:
|
||||
player_id (int): The player ID to recognize the region for.
|
||||
game (Game): The game the player ID belongs to.
|
||||
|
||||
Returns:
|
||||
Optional[Region]: The region the player ID belongs to if it can be recognized, None otherwise.
|
||||
"""
|
||||
first = int(str(player_id)[0])
|
||||
|
||||
for region, digits in UID_RANGE[game].items():
|
||||
if first in digits:
|
||||
return region
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def recognize_server(player_id: int, game: Game) -> str:
|
||||
"""
|
||||
Recognizes the server of a player ID for a given game.
|
||||
|
||||
Args:
|
||||
player_id (int): The player ID to recognize the server for.
|
||||
game (Game): The game the player ID belongs to.
|
||||
|
||||
Returns:
|
||||
str: The server the player ID belongs to.
|
||||
|
||||
Raises:
|
||||
ValueError: If the specified game is not supported.
|
||||
"""
|
||||
if game == Game.GENSHIN:
|
||||
return recognize_genshin_server(player_id)
|
||||
if game == Game.STARRAIL:
|
||||
return recognize_starrail_server(player_id)
|
||||
raise ValueError(f"{game} is not a valid game")
|
23
simnet/utils/types.py
Normal file
23
simnet/utils/types.py
Normal file
@ -0,0 +1,23 @@
|
||||
from typing import TypeVar, Union, Mapping, Optional, Sequence, Dict, List, Tuple, Any
|
||||
|
||||
RT = TypeVar("RT", bound="BaseClient")
|
||||
|
||||
|
||||
CookieTypes = Union["Cookie", Dict[str, str], List[Tuple[str, str]]]
|
||||
RequestData = Mapping[str, Any]
|
||||
PrimitiveData = Optional[Union[str, int, float, bool]]
|
||||
QueryParamTypes = Union[
|
||||
"QueryParams",
|
||||
Mapping[str, Union[PrimitiveData, Sequence[PrimitiveData]]],
|
||||
List[Tuple[str, PrimitiveData]],
|
||||
Tuple[Tuple[str, PrimitiveData], ...],
|
||||
str,
|
||||
bytes,
|
||||
]
|
||||
HeaderTypes = Union[
|
||||
"Headers",
|
||||
Mapping[str, str],
|
||||
Mapping[bytes, bytes],
|
||||
Sequence[Tuple[str, str]],
|
||||
Sequence[Tuple[bytes, bytes]],
|
||||
]
|
Loading…
Reference in New Issue
Block a user