From 90ee5725e0c65523011974e834f3c4d0a1d313be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B4=9B=E6=B0=B4=E5=B1=85=E5=AE=A4?= Date: Mon, 1 May 2023 17:30:57 +0800 Subject: [PATCH] :sparkles: Add Base Client --- simnet/client/base.py | 342 +++++++++++++++++++++++++++++++++++++++ simnet/client/cookies.py | 25 +++ simnet/client/headers.py | 5 + simnet/errors.py | 252 +++++++++++++++++++++++++++++ simnet/utils/cookies.py | 23 +++ simnet/utils/ds.py | 106 ++++++++++++ simnet/utils/enum_.py | 29 ++++ simnet/utils/lang.py | 15 ++ simnet/utils/player.py | 116 +++++++++++++ simnet/utils/types.py | 23 +++ 10 files changed, 936 insertions(+) create mode 100644 simnet/client/base.py create mode 100644 simnet/client/cookies.py create mode 100644 simnet/client/headers.py create mode 100644 simnet/errors.py create mode 100644 simnet/utils/cookies.py create mode 100644 simnet/utils/ds.py create mode 100644 simnet/utils/enum_.py create mode 100644 simnet/utils/lang.py create mode 100644 simnet/utils/player.py create mode 100644 simnet/utils/types.py diff --git a/simnet/client/base.py b/simnet/client/base.py new file mode 100644 index 0000000..7ed041f --- /dev/null +++ b/simnet/client/base.py @@ -0,0 +1,342 @@ +import logging +import uuid +from types import TracebackType +from typing import AsyncContextManager, Type, Optional, Any + +from httpx import AsyncClient, TimeoutException, Response, HTTPError + +from simnet.client.cookies import Cookies +from simnet.client.headers import Headers +from simnet.errors import TimedOut, NetworkError, BadRequest, raise_for_ret_code +from simnet.utils.ds import generate_dynamic_secret +from simnet.utils.enum_ import Region +from simnet.utils.types import ( + RT, + HeaderTypes, + CookieTypes, + RequestData, + QueryParamTypes, +) + +_LOGGER = logging.getLogger("simnet.BaseClient") + + +class BaseClient(AsyncContextManager["BaseClient"]): + """ + This is the base class for simnet clients. It provides common methods and properties for simnet clients. + + Parameters: + ----------- + cookies: Optional[CookieTypes] + The cookies used for the client. + headers: Optional[HeaderTypes] + The headers used for the client. + account_id: Optional[int] + The account id used for the client. + player_id: Optional[int] + The player id used for the client. + region: Region + The region used for the client. + lang: str + The language used for the client. + + Attributes: + ----------- + cookies: Cookies + The cookies used for the client. + headers: Headers + The headers used for the client. + player_id: Optional[int] + The player id used for the client. + account_id: Optional[int] + The account id used for the client. + client: AsyncClient + The httpx async client instance. + region: Region + The region used for the client. + lang: str + The language used for the client. + """ + + _device_id = str(uuid.uuid3(uuid.NAMESPACE_URL, "SIMNet")) + + def __init__( + self, + cookies: Optional[CookieTypes] = None, + headers: Optional[HeaderTypes] = None, + account_id: Optional[int] = None, + player_id: Optional[int] = None, + region: Region = Region.OVERSEAS, + lang: str = "en-us", + ) -> None: + """Initialize the client with the given parameters.""" + self.cookies = Cookies(cookies) + self.headers = Headers(headers) + self.player_id = player_id + self.account_id = account_id + self.client = AsyncClient(cookies=self.cookies) + self.region = region + self.lang = lang + + def get_player_id(self) -> Optional[int]: + """Get the player id used for the client.""" + player_id = self.player_id or self.cookies.account_id + return player_id + + @property + def device_name(self) -> str: + """Get the device name used for the client.""" + return "SIMNet Build 114514" + + @property + def device_id(self) -> str: + """Get the device id used for the client.""" + if self.account_id is not None: + return str(uuid.uuid3(uuid.NAMESPACE_URL, str(self.account_id))) + return self._device_id + + @property + def app_version(self) -> str: + """Get the app version used for the client.""" + if self.region == Region.CHINESE: + return "2.46.1" + if self.region == Region.OVERSEAS: + return "1.5.0" + return "null" + + @property + def client_type(self) -> str: + """Get the client type used for the client.""" + if self.region == Region.CHINESE: + return "5" + if self.region == Region.OVERSEAS: + return "5" + return "null" + + @property + def user_agent(self) -> str: + """Get the user agent used for the client.""" + if self.region == Region.CHINESE: + return ( + f"Mozilla/5.0 (Linux; {self.device_name}) " + "AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/111.0.5563.116 Mobile Safari/537.36 " + f"miHoYoBBS/{self.app_version}" + ) + return ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.5563.116 Safari/537.36" + ) + + async def __aenter__(self: RT) -> RT: + """Enter the async context manager and initialize the client.""" + try: + await self.initialize() + return self + except Exception as exc: + await self.shutdown() + raise exc + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + """Exit the async context manager and shutdown the client.""" + await self.shutdown() + + async def shutdown(self): + """Shutdown the client.""" + if self.client.is_closed: + _LOGGER.info("This Client is already shut down. Returning.") + return + + await self.client.aclose() + + async def initialize(self): + """Initialize the client.""" + + def get_default_header(self, header: HeaderTypes): + """Get the default header for API requests. + + Args: + header (HeaderTypes): The header to use. + + Returns: + Headers: The default header with added fields. + """ + header = Headers(header) + header["user-agent"] = self.user_agent + header["x-rpc-app_version"] = self.app_version + header["x-rpc-client_type"] = self.client_type + header["x-rpc-device_id"] = self.device_id + return header + + def get_lab_api_header( + self, + header: HeaderTypes, + lang: Optional[str] = None, + ds: str = None, + ds_type: str = None, + new_ds: bool = False, + data: Any = None, + params: Optional[QueryParamTypes] = None, + ): + """Get the lab API header for API requests. + + Args: + header (HeaderTypes): The header to use. + lang (Optional[str], optional): The language to use for overseas regions. Defaults to None. + ds (str, optional): The DS string to use. Defaults to None. + ds_type (str, optional): The DS type to use. Defaults to None. + new_ds (bool, optional): Whether to generate a new DS. Defaults to False. + data (Any, optional): The data to use. Defaults to None. + params (Optional[QueryParamTypes], optional): The query parameters to use. Defaults to None. + Returns: + Headers: The lab API header with added fields. + """ + header = Headers(header) + header["user-agent"] = self.user_agent + header["x-rpc-app_version"] = self.app_version + header["x-rpc-client_type"] = self.client_type + header["x-rpc-device_id"] = self.device_id + if self.region == Region.OVERSEAS: + header["x-rpc-language"] = self.lang or lang + if ds is None: + app_version, client_type, ds = generate_dynamic_secret( + self.region, ds_type, new_ds, data, params + ) + header["x-rpc-app_version"] = app_version + header["x-rpc-client_type"] = client_type + header["DS"] = ds + return header + + async def request( + self, + method: str, + url: str, + data: Optional[RequestData] = None, + json: Optional[Any] = None, + params: Optional[QueryParamTypes] = None, + headers: Optional[HeaderTypes] = None, + ) -> Response: + """Make an HTTP request and return the response. + + This method makes an HTTP request with the specified HTTP method, URL, request parameters, headers, + and JSON payload. It catches common HTTP errors and raises a `NetworkError` or `TimedOut` exception + if the request times out. + + Args: + method (str): The HTTP method to use for the request (e.g., "GET", "POST"). + url (str): The URL to send the request to. + data (Optional[RequestData]): The request data to include in the body of the request. + json (Optional[Any]): The JSON payload to include in the body of the request. + params (Optional[QueryParamTypes]): The query parameters to include in the request. + headers (Optional[HeaderTypes]): The headers to include in the request. + + Returns: + Response: A `Response` object representing the HTTP response. + + Raises: + NetworkError: If an HTTP error occurs while making the request. + TimedOut: If the request times out. + + """ + try: + return await self.client.request( + method, + url, + data=data, + json=json, + params=params, + headers=headers, + ) + except TimeoutException as exc: + raise TimedOut from exc + except HTTPError as exc: + raise NetworkError from exc + + async def request_api( + self, + method: str, + url: str, + json: Optional[Any] = None, + params: Optional[QueryParamTypes] = None, + headers: Optional[HeaderTypes] = None, + ): + """Make an API request and return the data. + + This method makes an API request using the `request()` method + and returns the data from the response if it is successful. + If the response contains an error, it raises a `BadRequest` exception. + + Args: + method (str): The HTTP method to use for the request (e.g., "GET", "POST"). + url (str): The URL to send the request to. + json (Optional[Any]): The JSON payload to include in the body of the request. + params (Optional[QueryParamTypes]): The query parameters to include in the request. + headers (Optional[HeaderTypes]): The headers to include in the request. + + Returns: + Any: The data returned by the API. + + Raises: + NetworkError: If an HTTP error occurs while making the request. + TimedOut: If the request times out. + BadRequest: If the response contains an error. + """ + response = await self.request( + method, + url, + json=json, + params=params, + headers=headers, + ) + # if "application/json" in response.headers.get("Content-Type", ""): + if not response.is_error: + data = response.json() + ret_code = data.get("retcode") + if response.is_error or ret_code != 0: + raise_for_ret_code(data) + return data["data"] + raise BadRequest(status_code=response.status_code, message=response.text) + + async def request_lab( + self, + url: str, + method: Optional[str] = None, + data: Optional[Any] = None, + params: Optional[QueryParamTypes] = None, + headers: Optional[HeaderTypes] = None, + lang: Optional[str] = None, + new_ds: bool = False, + ds_type: str = None, + ): + """Make a request to the lab API and return the data. + + This method makes a request to the lab API using the `request_api()` method + and returns the data from the response if it is successful. + It also adds headers for the lab API and handles the case where the method is not specified. + + Args: + url (str): The URL to send the request to. + method (Optional[str]): The HTTP method to use for the request (e.g., "GET", "POST"). + data (Optional[Any]): The JSON payload to include in the body of the request. + params (Optional[QueryParamTypes]): The query parameters to include in the request. + headers (Optional[HeaderTypes]): The headers to include in the request. + lang (Optional[str]): The language of the request (e.g., "en", "zh"). + new_ds (bool): Whether to use a new dataset for the request. + ds_type (str): The type of dataset to use for the request (e.g., "news", "qa"). + + Returns: + Any: The data returned by the lab API. + + """ + if method is None: + method = "POST" if data else "GET" + headers = self.get_lab_api_header( + headers, ds_type=ds_type, new_ds=new_ds, lang=lang, data=data, params=params + ) + return await self.request_api( + method=method, url=url, json=data, params=params, headers=headers + ) diff --git a/simnet/client/cookies.py b/simnet/client/cookies.py new file mode 100644 index 0000000..865b744 --- /dev/null +++ b/simnet/client/cookies.py @@ -0,0 +1,25 @@ +from typing import Optional + +from httpx import Cookies as _Cookies + + +class Cookies(_Cookies): + """An extension of the `httpx.Cookies` class that includes additional functionality.""" + + COOKIE_USER_ID_NAMES = ("ltuid", "account_id", "ltuid_v2", "account_id_v2") + + @property + def account_id(self) -> Optional[int]: + """Return the user account ID if present in the cookies. + + If one of the user ID cookies exists in the cookies, return its integer value. + Otherwise, return `None`. + + Returns: + Optional[int]: The user account ID, or `None` if it is not present in the cookies. + """ + for name in self.COOKIE_USER_ID_NAMES: + value = self.get(name) + if value is not None: + return int(value) + return None diff --git a/simnet/client/headers.py b/simnet/client/headers.py new file mode 100644 index 0000000..debc26b --- /dev/null +++ b/simnet/client/headers.py @@ -0,0 +1,5 @@ +from httpx import Headers as _Headers + + +class Headers(_Headers): + """An extension of the `httpx.Headers` class that includes additional functionality.""" diff --git a/simnet/errors.py b/simnet/errors.py new file mode 100644 index 0000000..7911755 --- /dev/null +++ b/simnet/errors.py @@ -0,0 +1,252 @@ +from typing import Any, Optional, Dict, Union, Tuple, Type, NoReturn + + +class ApiHelperException(Exception): + """Base class for ApiHelper errors.""" + + +class NetworkError(ApiHelperException): + """Base class for exceptions due to networking errors.""" + + +class TimedOut(NetworkError): + """Raised when a request took too long to finish.""" + + +class BadRequest(ApiHelperException): + """Raised when an API request cannot be processed correctly. + + Attributes: + status_code (int): The status code of the response. + ret_code (int): The error code of the response. + original (str): The original error message of the response. + message (str): The formatted error message of the response. + """ + + def __init__( + self, + response: Optional[Dict[str, Any]] = None, + message: Optional[str] = None, + status_code: Optional[int] = None, + ) -> None: + self.status_code = status_code or 0 + self.ret_code = response.get("retcode", 0) if response else 0 + self.original = response.get("message", "") if response else "" + self.message = message or self.original + + display_code = self.ret_code or self.status_code + display_message = ( + f"[{display_code}] {self.message}" if display_code else self.message + ) + + super().__init__(display_message) + + def __repr__(self) -> str: + response = { + "status_code": self.status_code, + "retcode": self.ret_code, + "message": self.original, + } + return f"{type(self).__name__}({repr(response)})" + + @property + def response(self) -> dict[str, Union[str, Any, None]]: + return {"retcode": self.ret_code, "message": self.original} + + +class InternalDatabaseError(BadRequest): + """Internal database error.""" + + ret_code = -1 + + +class AccountNotFound(BadRequest): + """Tried to get data with an invalid uid.""" + + message = "Could not find user; uid may be invalid." + + +class DataNotPublic(BadRequest): + """User hasn't set their data to public.""" + + message = "User's data is not public." + + +class CookieException(BadRequest): + """Base error for cookies.""" + + +class InvalidCookies(CookieException): + """Cookies weren't valid.""" + + ret_code = -100 + message = "Cookies are not valid." + + +class TooManyRequests(CookieException): + """Made too many requests and got ratelimited.""" + + ret_code = 10101 + message = "Cannot get data for more than 30 accounts per cookie per day." + + +class VisitsTooFrequently(BadRequest): + """Visited a page too frequently. + + Must be handled with exponential backoff. + """ + + ret_code = -110 + message = "Visits too frequently." + + +class AlreadyClaimed(BadRequest): + """Already claimed the daily reward today.""" + + ret_code = -5003 + message = "Already claimed the daily reward today." + + +class AuthkeyException(BadRequest): + """Base error for authkeys.""" + + +class InvalidAuthkey(AuthkeyException): + """Authkey is not valid.""" + + ret_code = -100 + message = "Authkey is not valid." + + +class AuthkeyTimeout(AuthkeyException): + """Authkey has timed out.""" + + ret_code = -101 + message = "Authkey has timed out." + + +class RedemptionException(BadRequest): + """Exception caused by redeeming a code.""" + + +class RedemptionInvalid(RedemptionException): + """Invalid redemption code.""" + + message = "Invalid redemption code." + + +class RedemptionCooldown(RedemptionException): + """Redemption is on cooldown.""" + + message = "Redemption is on cooldown." + + +class RedemptionClaimed(RedemptionException): + """Redemption code has been claimed already.""" + + message = "Redemption code has been claimed already." + + +_TBR = Type[BadRequest] +_errors: Dict[int, Union[_TBR, str, Tuple[_TBR, Optional[str]]]] = { + # misc hoyolab + -100: InvalidCookies, + -108: "Invalid language.", + -110: VisitsTooFrequently, + # game record + 10001: InvalidCookies, + -10001: "Malformed request.", + -10002: "No genshin account associated with cookies.", + # database game record + 10101: TooManyRequests, + 10102: DataNotPublic, + 10103: ( + InvalidCookies, + "Cookies are valid but do not have a hoyolab account bound to them.", + ), + 10104: "Cannot view real-time notes of other users.", + # calculator + -500001: "Invalid fields in calculation.", + -500004: VisitsTooFrequently, + -502001: "User does not have this character.", + -502002: "Calculator sync is not enabled.", + # mixin + -1: InternalDatabaseError, + 1009: AccountNotFound, + # redemption + -1065: RedemptionInvalid, + -1071: InvalidCookies, + -1073: (AccountNotFound, "Account has no game account bound to it."), + -2001: (RedemptionInvalid, "Redemption code has expired."), + -2003: (RedemptionInvalid, "Redemption code is incorrectly formatted."), + -2004: RedemptionInvalid, + -2014: (RedemptionInvalid, "Redemption code not activated"), + -2016: RedemptionCooldown, + -2017: RedemptionClaimed, + -2018: RedemptionClaimed, + -2021: ( + RedemptionException, + "Cannot claim codes for accounts with adventure rank lower than 10.", + ), + # rewards + -5003: AlreadyClaimed, + # chinese + 1008: AccountNotFound, + -1104: "This action must be done in the app.", +} + + +ERRORS: Dict[int, Tuple[_TBR, Optional[str]]] = { + ret_code: ( + (exc, None) + if isinstance(exc, type) + else (BadRequest, exc) + if isinstance(exc, str) + else exc + ) + for ret_code, exc in _errors.items() +} + + +def raise_for_ret_code(data: Dict[str, Any]) -> NoReturn: + """Raise an equivalent error to a response. + + Args: + data (dict): The response data. + + Raises: + InvalidAuthkey: If the authkey is invalid. + AuthkeyTimeout: If the authkey has timed out. + AuthkeyException: If there is an authkey exception. + RedemptionException: If there is a redemption exception. + BadRequest: If there is a bad request. + + game record: + 10001 = invalid cookie + 101xx = generic errors + authkey: + -100 = invalid authkey + -101 = authkey timed out + code redemption: + 20xx = invalid code or state + -107x = invalid cookies + daily reward: + -500x = already claimed the daily reward + """ + r, m = data.get("retcode", 0), data.get("message", "") + + if m.startswith("authkey"): + if r == -100: + raise InvalidAuthkey(data) + if r == -101: + raise AuthkeyTimeout(data) + raise AuthkeyException(data) + + if r in ERRORS: + exc_type, msg = ERRORS[r] + raise exc_type(data, msg) + + if "redemption" in m: + raise RedemptionException(data) + + raise BadRequest(data) diff --git a/simnet/utils/cookies.py b/simnet/utils/cookies.py new file mode 100644 index 0000000..a85d113 --- /dev/null +++ b/simnet/utils/cookies.py @@ -0,0 +1,23 @@ +"""A module for parsing cookies.""" +from http.cookies import SimpleCookie +from typing import Dict + + +def parse_cookie(cookie: str) -> Dict[str, str]: + """ + Parses a cookie or header into a dictionary of key-value pairs. + + Args: + cookie (str): The cookie or header to parse. + + Returns: + Dict[str, str]: A dictionary of key-value pairs representing the parsed cookie. + + Example: + >>> cookie = "sessionid=123456; expires=Fri, 31-Dec-2021 23:59:59 GMT; HttpOnly; Max-Age=31449600; Path=/" + >>> parse_cookie(cookie) + {'sessionid': '123456'} + """ + cookie = SimpleCookie(cookie) + + return {str(k): v.value for k, v in cookie.items()} diff --git a/simnet/utils/ds.py b/simnet/utils/ds.py new file mode 100644 index 0000000..ff86a9c --- /dev/null +++ b/simnet/utils/ds.py @@ -0,0 +1,106 @@ +import hashlib +import json +import random +import string +import time +from enum import Enum +from typing import Any, Optional + +from simnet.utils.enum_ import Region +from simnet.utils.types import QueryParamTypes + + +class DSType(Enum): + """ + Enumeration of dynamic secret types. + + Attributes: + ANDROID (str): Android dynamic secret type. + ANDROID_NEW (str): New Android dynamic secret type. + SIGN (str): Sign dynamic secret type. + """ + + ANDROID = "android" + ANDROID_NEW = "android_new" + SIGN = "sign" + + +def hex_digest(text): + """ + Computes the MD5 hash digest of the given text. + + Args: + text (str): The text to hash. + + Returns: + str: The MD5 hash digest of the given text. + """ + _md5 = hashlib.md5() # nosec B303 + _md5.update(text.encode()) + return _md5.hexdigest() + + +def generate_dynamic_secret( + region: Region, + ds_type: Optional[DSType] = None, + new_ds: bool = False, + data: Any = None, + params: Optional[QueryParamTypes] = None, +): + """ + Generates a dynamic secret. + + Args: + region (Region): The region for which to generate the dynamic secret. + ds_type (Optional[DSType], optional): The dynamic secret type. Defaults to None. + new_ds (bool, optional): Whether to generate a new or old dynamic secret. Defaults to False. + data (Any, optional): The data to include in the dynamic secret. Defaults to None. + params (Optional[QueryParamTypes], optional): The query parameters to include in the dynamic secret. + Defaults to None. + + Raises: + ValueError: If the region or ds_type is not recognized. + + Returns: + Tuple[str, str, str]: A tuple containing the app version, client type, and dynamic secret. + """ + + def new(): + """Create a new dynamic secret 2.""" + t = str(int(time.time())) + r = str(random.randint(100001, 200000)) # nosec + b = json.dumps(data) if data else "" + q = "&".join(f"{k}={v}" for k, v in sorted(params.items())) if params else "" + c = hex_digest(f"salt={salt}&t={t}&r={r}&b={b}&q={q}") + return f"{t},{r},{c}" + + def old(): + """Create a new dynamic secret.""" + t = str(int(time.time())) + r = "".join(random.sample(string.ascii_lowercase + string.digits, 6)) + c = hex_digest(f"salt={salt}&t={t}&r={r}") + return f"{t},{r},{c}" + + app_version = "2.46.1" + client_type = "5" + if region == Region.OVERSEAS: + salt = "6s25p5ox5y14umn1p61aqyyvbvvl3lrt" + app_version = "1.5.0" + elif region == Region.CHINESE: + if ds_type is None: + salt = "xV8v4Qu54lUKrEYFZkJhB8cuOh9Asafs" + elif ds_type == DSType.ANDROID: + salt = "KZazpG4cO2QECFDBUCxdhS8cYCsQHfzn" + client_type = "2" + elif ds_type == DSType.ANDROID_NEW: + client_type = "2" + salt = "t0qEgfub6cvueAPgR5m9aQWWVciEer7v" + else: + raise ValueError(f"Unknown ds_type: {ds_type}") + else: + raise ValueError(f"Unknown region: {region}") + if new_ds: + ds = new() + else: + ds = old() + return app_version, client_type, ds diff --git a/simnet/utils/enum_.py b/simnet/utils/enum_.py new file mode 100644 index 0000000..90682cb --- /dev/null +++ b/simnet/utils/enum_.py @@ -0,0 +1,29 @@ +import enum as _enum + + +class Region(str, _enum.Enum): + """ + Represents a region where a game is being played. + + Attributes: + OVERSEAS (Region): Represents an overseas region where a game is being played. + CHINESE (Region): Represents a Chinese region where a game is being played. + """ + + OVERSEAS = "os" + CHINESE = "cn" + + +class Game(str, _enum.Enum): + """ + Represents a game that can be played in different regions. + + Attributes: + GENSHIN (Game): Represents the game "Genshin Impact". + HONKAI (Game): Represents the game "Honkai Impact 3rd". + STARRAIL (Game): Represents the game "Honkai Impact 3rd RPG". + """ + + GENSHIN = "genshin" + HONKAI = "honkai3rd" + STARRAIL = "hkrpg" diff --git a/simnet/utils/lang.py b/simnet/utils/lang.py new file mode 100644 index 0000000..c54d53d --- /dev/null +++ b/simnet/utils/lang.py @@ -0,0 +1,15 @@ +def create_short_lang_code(lang: str) -> str: + """ + Create a short language code from a longer one. + + This function takes a language code as input and returns a shortened version of it. If the language code contains + "zh", the function returns the original code. Otherwise, the function returns the first part of the code (before the + first hyphen, if there is one). + + Args: + lang (str): The language code to be shortened. + + Returns: + str: The shortened language code. + """ + return lang if "zh" in lang else lang.split("-")[0] diff --git a/simnet/utils/player.py b/simnet/utils/player.py new file mode 100644 index 0000000..013875b --- /dev/null +++ b/simnet/utils/player.py @@ -0,0 +1,116 @@ +"""This module contains functions for recognizing servers associated with different player IDs.""" + +from typing import Optional, Mapping, Sequence +from simnet.utils.enum_ import Game, Region + +UID_RANGE: Mapping[Game, Mapping[Region, Sequence[int]]] = { + Game.GENSHIN: { + Region.OVERSEAS: (6, 7, 8, 9), + Region.CHINESE: (1, 2, 5), + }, + Game.STARRAIL: { + Region.OVERSEAS: (6, 7, 8, 9), + Region.CHINESE: (1, 2), + }, + Game.HONKAI: { + Region.OVERSEAS: (1, 2), + Region.CHINESE: (3, 4), + }, +} + + +def recognize_genshin_server(player_id: int) -> str: + """Recognize which server a Genshin UID is from. + + Args: + player_id (int): The player ID to recognize the server for. + + Returns: + str: The name of the server associated with the given player ID. + + Raises: + ValueError: If the player ID is not associated with any server. + """ + server = { + "1": "cn_gf01", + "2": "cn_gf01", + "5": "cn_qd01", + "6": "os_usa", + "7": "os_euro", + "8": "os_asia", + "9": "os_cht", + }.get(str(player_id)[0]) + + if server: + return server + + raise ValueError(f"Player id {player_id} isn't associated with any server") + + +def recognize_starrail_server(player_id: int) -> str: + """Recognize which server a StarRail UID is from. + + Args: + player_id (int): The player ID to recognize the server for. + + Returns: + str: The name of the server associated with the given player ID. + + Raises: + ValueError: If the player ID is not associated with any server. + """ + server = { + "1": "prod_gf_cn", + "2": "prod_gf_cn", + "5": "", + "6": "", + "7": "", + "8": "", + "9": "", + }.get(str(player_id)[0]) + + if server: + return server + + raise ValueError(f"player id {player_id} isn't associated with any server") + + +def recognize_region(player_id: int, game: Game) -> Optional[Region]: + """ + Recognizes the region of a player ID for a given game. + + Args: + player_id (int): The player ID to recognize the region for. + game (Game): The game the player ID belongs to. + + Returns: + Optional[Region]: The region the player ID belongs to if it can be recognized, None otherwise. + """ + first = int(str(player_id)[0]) + + for region, digits in UID_RANGE[game].items(): + if first in digits: + return region + + return None + + +def recognize_server(player_id: int, game: Game) -> str: + """ + Recognizes the server of a player ID for a given game. + + Args: + player_id (int): The player ID to recognize the server for. + game (Game): The game the player ID belongs to. + + Returns: + str: The server the player ID belongs to. + + Raises: + ValueError: If the specified game is not supported. + """ + if game == Game.GENSHIN: + return recognize_genshin_server(player_id) + if game == Game.STARRAIL: + return recognize_starrail_server(player_id) + raise ValueError(f"{game} is not a valid game") diff --git a/simnet/utils/types.py b/simnet/utils/types.py new file mode 100644 index 0000000..b142069 --- /dev/null +++ b/simnet/utils/types.py @@ -0,0 +1,23 @@ +from typing import TypeVar, Union, Mapping, Optional, Sequence, Dict, List, Tuple, Any + +RT = TypeVar("RT", bound="BaseClient") + + +CookieTypes = Union["Cookie", Dict[str, str], List[Tuple[str, str]]] +RequestData = Mapping[str, Any] +PrimitiveData = Optional[Union[str, int, float, bool]] +QueryParamTypes = Union[ + "QueryParams", + Mapping[str, Union[PrimitiveData, Sequence[PrimitiveData]]], + List[Tuple[str, PrimitiveData]], + Tuple[Tuple[str, PrimitiveData], ...], + str, + bytes, +] +HeaderTypes = Union[ + "Headers", + Mapping[str, str], + Mapping[bytes, bytes], + Sequence[Tuple[str, str]], + Sequence[Tuple[bytes, bytes]], +]