From 876bc6c6bcae75d28235727fdebb8c762a15fa34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B4=9B=E6=B0=B4=E5=B1=85=E5=AE=A4?= Date: Tue, 21 Feb 2023 17:43:40 +0800 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Refactor=20AuthClient?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apihelper/client/components/authclient.py | 227 ++++++++++++++++++ modules/apihelper/client/components/signin.py | 172 ------------- modules/apihelper/models/genshin/cookies.py | 63 +++++ plugins/genshin/cookies.py | 35 ++- 4 files changed, 307 insertions(+), 190 deletions(-) create mode 100644 modules/apihelper/client/components/authclient.py delete mode 100644 modules/apihelper/client/components/signin.py create mode 100644 modules/apihelper/models/genshin/cookies.py diff --git a/modules/apihelper/client/components/authclient.py b/modules/apihelper/client/components/authclient.py new file mode 100644 index 0000000..86a3c0f --- /dev/null +++ b/modules/apihelper/client/components/authclient.py @@ -0,0 +1,227 @@ +import asyncio +import json +import random +import qrcode + +from io import BytesIO +from string import ascii_letters, digits +from typing import Dict, Union, Optional, Tuple, Any +from httpx import AsyncClient + +from ...logger import logger +from ...models.genshin.cookies import CookiesModel +from ...utility.helpers import get_device_id, get_ds + +__all__ = ("AuthClient",) + + +class AuthClient: + player_id: Optional[int] = None + user_id: Optional[int] = None + cookies: Optional[CookiesModel] = None + device_id: Optional[str] = None + + USER_AGENT = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15" + ) + PASSPORT_HOST = "passport-api.mihoyo.com" + HK4E_SDK_HOST = "hk4e-sdk.mihoyo.com" + TAKUMI_HOST = "api-takumi.mihoyo.com" + QRCODE_GEN_API = f"https://{HK4E_SDK_HOST}/hk4e_cn/combo/panda/qrcode/fetch" + QRCODE_GET_API = f"https://{HK4E_SDK_HOST}/hk4e_cn/combo/panda/qrcode/query" + GET_COOKIE_ACCOUNT_BY_GAME_TOKEN_API = f"https://{TAKUMI_HOST}/auth/api/getCookieAccountInfoByGameToken" + GET_TOKEN_BY_GAME_LTOKEN_API = f"https://{PASSPORT_HOST}/account/ma-cn-session/app/getTokenByGameToken" + GET_COOKIES_TOKEN_BY_STOKEN_API = f"https://{PASSPORT_HOST}/account/auth/api/getCookieAccountInfoBySToken" + GET_LTOKEN_BY_STOKEN_API = f"https://{PASSPORT_HOST}/account/auth/api/getLTokenBySToken" + get_STOKEN_URL = f"https://{TAKUMI_HOST}/auth/api/getMultiTokenByLoginTicket" + + def __init__( + self, + player_id: Optional[int] = None, + user_id: Optional[int] = None, + cookies: Optional[Union[CookiesModel, dict]] = None, + ): + self.client = AsyncClient() + self.player_id = player_id + if cookies is None: + self.cookies = CookiesModel() + else: + if isinstance(cookies, dict): + self.cookies = CookiesModel(**cookies) + elif isinstance(cookies, CookiesModel): + self.cookies = cookies + else: + raise RuntimeError + if user_id: + self.user_id = user_id + else: + self.user_id = self.cookies.user_id + + async def get_stoken_by_login_ticket(self) -> bool: + if self.cookies.login_ticket is None and self.user_id is None: + return False + params = {"login_ticket": self.cookies.login_ticket, "uid": self.user_id, "token_types": 3} + data = await self.client.get(self.get_STOKEN_URL, params=params, headers={"User-Agent": self.USER_AGENT}) + res_json = data.json() + res_data = res_json.get("data", {}).get("list", []) + for i in res_data: + name = i.get("name") + token = i.get("token") + if name and token: + if hasattr(self.cookies, name): + setattr(self.cookies, name, token) + if self.cookies.stoken: + if self.cookies.stuid: + self.cookies.stuid = self.user_id + return True + return False + + async def get_ltoken_by_game_token(self, game_token: str) -> bool: + if self.user_id is None: + return False + data = {"account_id": self.user_id, "game_token": game_token} + headers = { + "x-rpc-aigis": "", + "Content-Type": "application/json", + "Accept": "application/json", + "x-rpc-game_biz": "bbs_cn", + "x-rpc-sys_version": "11", + "x-rpc-device_id": get_device_id(self.USER_AGENT), + "x-rpc-device_fp": "".join(random.choices((ascii_letters + digits), k=13)), + "x-rpc-device_name": "Chrome 108.0.0.0", + "x-rpc-device_model": "Windows 10 64-bit", + "x-rpc-app_id": "bll8iq97cem8", + "User-Agent": "okhttp/4.8.0", + } + app_version, client_type, ds_sign = get_ds(new_ds=True, data=data) + headers["x-rpc-app_version"] = app_version + headers["x-rpc-client_type"] = client_type + headers["DS"] = ds_sign + res = await self.client.post( + self.GET_TOKEN_BY_GAME_LTOKEN_API, + headers=headers, + json={"account_id": self.user_id, "game_token": game_token}, + ) + ltoken_data = res.json() + self.cookies.ltmid_v2 = ltoken_data["data"]["user_info"]["mid"] + self.cookies.ltoken_v2 = ltoken_data["data"]["token"]["token"] + return True + + async def create_qrcode_login(self) -> tuple[str, str]: + self.device_id = get_device_id("".join(random.choices((ascii_letters + digits), k=64))) + data = {"app_id": "8", "device": self.device_id} + res = await self.client.post(self.QRCODE_GEN_API, json=data) + res_json = res.json() + url = res_json.get("data", {}).get("url", "") + if not url: + return "", "" + ticket = url.split("ticket=")[1] + return url, ticket + + async def _get_cookie_token_data(self, game_token: str, account_id: int) -> Dict: + res = await self.client.get( + self.GET_COOKIE_ACCOUNT_BY_GAME_TOKEN_API, + params={"game_token": game_token, "account_id": account_id}, + ) + return res.json() + + async def _set_cookie_by_game_token(self, data: Dict) -> bool: + game_token = json.loads(data.get("payload", {}).get("raw", "{}")) + if not game_token: + return False + uid = game_token["uid"] + self.user_id = int(uid) + cookie_token_data = await self._get_cookie_token_data(game_token["token"], self.user_id) + await self.get_ltoken_by_game_token(game_token["token"]) + cookie_token = cookie_token_data["data"]["cookie_token"] + self.cookies.cookie_token = cookie_token + self.cookies.account_id = game_token["uid"] + return True + + async def check_qrcode_login(self, ticket: str): + data = {"app_id": "8", "ticket": ticket, "device": self.device_id} + for _ in range(20): + await asyncio.sleep(10) + res = await self.client.post(self.QRCODE_GET_API, json=data) + res_json = res.json() + ret_code = res_json.get("retcode", 1) + if ret_code != 0: + logger.debug("QRCODE_GET_API: [%s]%s", res_json.get("retcode"), res_json.get("message")) + return False + logger.debug("QRCODE_GET_API: %s", res_json.get("data")) + res_data = res_json.get("data", {}) + if res_data.get("stat", "") == "Confirmed": + return await self._set_cookie_by_game_token(res_json.get("data", {})) + + async def get_cookie_token_by_stoken(self) -> bool: + if self.cookies.stoken is None: + return False + user_id = self.cookies.user_id + headers = { + "x-rpc-app_version": "2.11.1", + "User-Agent": ( + "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1" + ), + "x-rpc-client_type": "5", + "Referer": "https://webstatic.mihoyo.com/", + "Origin": "https://webstatic.mihoyo.com", + } + params = { + "stoken": self.cookies.stoken, + "uid": user_id, + } + res = await self.client.get( + self.GET_COOKIES_TOKEN_BY_STOKEN_API, + headers=headers, + params=params, + ) + res_json = res.json() + cookie_token = res_json.get("data", {}).get("cookie_token", "") + if cookie_token: + self.cookies.cookie_token = cookie_token + self.cookies.account_id = user_id + return True + return False + + async def get_ltoken_by_stoken(self) -> bool: + if self.cookies.stoken is None: + return False + user_id = self.cookies.user_id + headers = { + "x-rpc-app_version": "2.11.1", + "User-Agent": ( + "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1" + ), + "x-rpc-client_type": "5", + "Referer": "https://webstatic.mihoyo.com/", + "Origin": "https://webstatic.mihoyo.com", + } + params = { + "stoken": self.cookies.stoken, + "uid": user_id, + } + res = await self.client.get( + self.GET_LTOKEN_BY_STOKEN_API, + headers=headers, + params=params, + ) + res_json = res.json() + ltoken = res_json.get("data", {}).get("ltoken", "") + if ltoken: + self.cookies.ltoken = ltoken + self.cookies.ltuid = user_id + return True + return False + + @staticmethod + def generate_qrcode(url: str) -> bytes: + qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4) + qr.add_data(url) + qr.make(fit=True) + img = qr.make_image(fill_color="black", back_color="white") + bio = BytesIO() + img.save(bio, format="PNG") + return bio.getvalue() diff --git a/modules/apihelper/client/components/signin.py b/modules/apihelper/client/components/signin.py deleted file mode 100644 index beb769e..0000000 --- a/modules/apihelper/client/components/signin.py +++ /dev/null @@ -1,172 +0,0 @@ -import asyncio -import json -import random -import qrcode - -from io import BytesIO -from string import ascii_letters, digits -from typing import Dict -from httpx import AsyncClient - -from ...logger import logger -from ...utility.helpers import get_device_id, get_ds - -__all__ = ("SignIn",) - - -class SignIn: - S_TOKEN_URL = ( - "https://api-takumi.mihoyo.com/auth/api/getMultiTokenByLoginTicket?login_ticket={0}&token_types=3&uid={1}" - ) - USER_AGENT = ( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " - "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15" - ) - QRCODE_GEN_API = "https://hk4e-sdk.mihoyo.com/hk4e_cn/combo/panda/qrcode/fetch" - QRCODE_GET_API = "https://hk4e-sdk.mihoyo.com/hk4e_cn/combo/panda/qrcode/query" - GAME_TOKEN_API = "https://api-takumi.mihoyo.com/auth/api/getCookieAccountInfoByGameToken" - GAME_LTOKEN_API = "https://passport-api.mihoyo.com/account/ma-cn-session/app/getTokenByGameToken" - COOKIES_GET_API = "https://passport-api.mihoyo.com/account/auth/api/getCookieAccountInfoBySToken" - - def __init__(self, uid: int = 0, cookie: Dict = None): - self.client = AsyncClient() - self.uid = uid - self.cookie = cookie.copy() if cookie is not None else {} - self.parse_uid() - self.ticket = None - self.device_id = None - - def parse_uid(self): - """ - 从cookie中获取uid - :param self: - :return: - """ - if not self.cookie: - return - for item in ["login_uid", "stuid", "ltuid", "account_id"]: - if item in self.cookie: - self.uid = self.cookie[item] - break - for item in ["login_uid", "stuid", "ltuid", "account_id"]: - self.cookie[item] = self.uid - - async def get_s_token(self): - if not self.cookie.get("login_ticket") or not self.uid: - return - data = await self.client.get( - self.S_TOKEN_URL.format(self.cookie["login_ticket"], self.uid), headers={"User-Agent": self.USER_AGENT} - ) - res_json = data.json() - res_data = res_json.get("data", {}).get("list", []) - for i in res_data: - if i.get("name") and i.get("token"): - self.cookie[i.get("name")] = i.get("token") - - async def get_ltoken_by_game_token(self, game_token: str): - data = {"account_id": self.uid, "game_token": game_token} - headers = { - "x-rpc-aigis": "", - "Content-Type": "application/json", - "Accept": "application/json", - "x-rpc-game_biz": "bbs_cn", - "x-rpc-sys_version": "11", - "x-rpc-device_id": get_device_id(self.USER_AGENT), - "x-rpc-device_fp": "".join(random.choices((ascii_letters + digits), k=13)), - "x-rpc-device_name": "Chrome 108.0.0.0", - "x-rpc-device_model": "Windows 10 64-bit", - "x-rpc-app_id": "bll8iq97cem8", - "User-Agent": "okhttp/4.8.0", - } - app_version, client_type, ds_sign = get_ds(new_ds=True, data=data) - headers["x-rpc-app_version"] = app_version - headers["x-rpc-client_type"] = client_type - headers["DS"] = ds_sign - res = await self.client.post( - self.GAME_LTOKEN_API, - headers=headers, - json={"account_id": self.uid, "game_token": game_token}, - ) - return res.json() - - async def create_login_data(self) -> str: - self.device_id = get_device_id("".join(random.choices((ascii_letters + digits), k=64))) - data = {"app_id": "8", "device": self.device_id} - res = await self.client.post(self.QRCODE_GEN_API, json=data) - res_json = res.json() - url = res_json.get("data", {}).get("url", "") - if not url: - return "" - self.ticket = url.split("ticket=")[1] - return url - - async def get_cookie_token_data(self, game_token: str) -> Dict: - res = await self.client.get( - self.GAME_TOKEN_API, - params={"game_token": game_token, "account_id": self.uid}, - ) - return res.json() - - async def set_cookie(self, data: Dict) -> bool: - self.cookie = {} - game_token = json.loads(data.get("payload", {}).get("raw", "{}")) - if not game_token: - return False - self.uid = int(game_token["uid"]) - for item in ["login_uid", "stuid", "ltuid", "account_id"]: - self.cookie[item] = str(self.uid) - cookie_token_data = await self.get_cookie_token_data(game_token["token"]) - ltoken_data = await self.get_ltoken_by_game_token(game_token["token"]) - self.cookie["cookie_token"] = cookie_token_data["data"]["cookie_token"] - for item in ["account_mid_v2", "ltmid_v2"]: - self.cookie[item] = ltoken_data["data"]["user_info"]["mid"] - self.cookie["ltoken_v2"] = ltoken_data["data"]["token"]["token"] - return True - - async def check_login(self): - data = {"app_id": "8", "ticket": self.ticket, "device": self.device_id} - for _ in range(20): - await asyncio.sleep(10) - res = await self.client.post(self.QRCODE_GET_API, json=data) - res_json = res.json() - ret_code = res_json.get("retcode", 1) - if ret_code != 0: - logger.debug("QRCODE_GET_API: [%s]%s", res_json.get("retcode"), res_json.get("message")) - return False - logger.debug("QRCODE_GET_API: %s", res_json.get("data")) - res_data = res_json.get("data", {}) - if res_data.get("stat", "") == "Confirmed": - return await self.set_cookie(res_json.get("data", {})) - - async def get_cookie_account_info_by_stoken(self, stoken, uid): - headers = { - "x-rpc-app_version": "2.11.1", - "User-Agent": ( - "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) " - "AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1" - ), - "x-rpc-client_type": "5", - "Referer": "https://webstatic.mihoyo.com/", - "Origin": "https://webstatic.mihoyo.com", - } - params = { - "stoken": stoken, - "uid": uid, - } - res = await self.client.get( - self.COOKIES_GET_API, - headers=headers, - params=params, - ) - res_json = res.json() - return res_json.get("data", {}).get("cookie_token", "") - - @staticmethod - def generate_qrcode(url: str) -> bytes: - qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4) - qr.add_data(url) - qr.make(fit=True) - img = qr.make_image(fill_color="black", back_color="white") - bio = BytesIO() - img.save(bio, format="PNG") - return bio.getvalue() diff --git a/modules/apihelper/models/genshin/cookies.py b/modules/apihelper/models/genshin/cookies.py new file mode 100644 index 0000000..3b54dbc --- /dev/null +++ b/modules/apihelper/models/genshin/cookies.py @@ -0,0 +1,63 @@ +from typing import Optional, TypeVar + +from pydantic import BaseModel + +IntStr = TypeVar("IntStr", int, str) + +__all__ = ("CookiesModel",) + + +class CookiesModel(BaseModel): + login_uid: Optional[IntStr] = None + login_ticket: Optional[str] = None + + stoken: Optional[str] = None + stuid: Optional[IntStr] = None + + account_id: Optional[IntStr] = None + cookie_token: Optional[str] = None + + ltoken: Optional[str] = None + ltuid: Optional[IntStr] = None + + account_mid_v2: Optional[str] = None + cookie_token_v2: Optional[str] = None + + ltoken_v2: Optional[str] = None + ltmid_v2: Optional[str] = None + + @property + def is_v1(self) -> bool: + if self.account_id or self.cookie_token or self.ltoken or self.ltuid: + return True + return False + + @property + def is_v2(self) -> bool: + if self.account_mid_v2 or self.cookie_token_v2 or self.ltoken_v2 or self.ltmid_v2: + return True + return False + + def remove_v2(self): + self.account_mid_v2 = None + self.cookie_token_v2 = None + self.ltoken_v2 = None + self.ltmid_v2 = None + + def to_dict(self): + return self.dict(exclude_defaults=True) + + def to_json(self): + return self.json(exclude_defaults=True) + + @property + def user_id(self) -> Optional[int]: + if self.ltuid: + return self.ltuid + if self.account_id: + return self.account_id + if self.login_uid: + return self.login_uid + if self.stuid: + return self.stuid + return None diff --git a/plugins/genshin/cookies.py b/plugins/genshin/cookies.py index af43a2a..8d42762 100644 --- a/plugins/genshin/cookies.py +++ b/plugins/genshin/cookies.py @@ -12,13 +12,12 @@ from telegram.helpers import escape_markdown from core.baseplugin import BasePlugin from core.cookies.error import CookiesNotFoundError -from core.cookies.models import Cookies from core.cookies.services import CookiesService from core.plugin import Plugin, conversation, handler from core.user.error import UserNotFoundError from core.user.models import User from core.user.services import UserService -from modules.apihelper.client.components.signin import SignIn +from modules.apihelper.client.components.authclient import AuthClient from utils.decorators.error import error_callable from utils.decorators.restricts import restricts from utils.log import logger @@ -31,7 +30,6 @@ class AddUserCommandData(TelegramObject): cookies: dict = {} game_uid: int = 0 phone: int = 0 - sign_in_client: Optional[SignIn] = None CHECK_SERVER, INPUT_COOKIES, COMMAND_RESULT = range(10100, 10103) @@ -104,13 +102,13 @@ class SetUserCookies(Plugin.Conversation, BasePlugin.Conversation): else: await message.reply_text("警告,你已经绑定Cookie,如果继续操作会覆盖当前Cookie。") add_user_command_data.user = user_info - sign_in_client = SignIn() - url = await sign_in_client.create_login_data() - data = sign_in_client.generate_qrcode(url) + auth_client = AuthClient() + url, ticket = await auth_client.create_qrcode_login() + data = auth_client.generate_qrcode(url) text = f"你好 {user.mention_html()} !该绑定方法仅支持国服,请在3分钟内使用米游社扫码并确认进行绑定。" await message.reply_photo(data, caption=text, parse_mode=ParseMode.HTML) - if await sign_in_client.check_login(): - add_user_command_data.cookies = sign_in_client.cookie + if await auth_client.check_qrcode_login(ticket): + add_user_command_data.cookies = auth_client.cookies.to_dict() return await self.check_cookies(update, context) else: await message.reply_markdown_v2("可能是验证码已过期或者你没有同意授权,请重新发送命令进行绑定。") @@ -272,16 +270,17 @@ class SetUserCookies(Plugin.Conversation, BasePlugin.Conversation): return ConversationHandler.END with contextlib.suppress(Exception): if cookies.get("login_ticket"): - sign_in_client = SignIn(cookie=add_user_command_data.cookies) - await sign_in_client.get_s_token() - add_user_command_data.cookies = sign_in_client.cookie - logger.success("用户 %s[%s] 绑定时获取 stoken 成功", user.full_name, user.id) - stoken = add_user_command_data.cookies.get("stoken") - account_id = add_user_command_data.cookies.get("account_id") - if stoken and account_id: - cookie_token = await sign_in_client.get_cookie_account_info_by_stoken(stoken, account_id) - add_user_command_data.cookies["cookie_token"] = cookie_token - logger.success("用户 %s[%s] 绑定时获取 cookie_token 成功", user.full_name, user.id) + auth_client = AuthClient(cookies=add_user_command_data.cookies) + if await auth_client.get_stoken_by_login_ticket(): + logger.success("用户 %s[%s] 绑定时获取 stoken 成功", user.full_name, user.id) + add_user_command_data.cookies = auth_client.cookies.to_dict() + if await auth_client.get_cookie_token_by_stoken(): + logger.success("用户 %s[%s] 绑定时获取 cookie_token 成功", user.full_name, user.id) + add_user_command_data.cookies = auth_client.cookies.to_dict() + if await auth_client.get_ltoken_by_stoken(): + logger.success("用户 %s[%s] 绑定时获取 ltoken 成功", user.full_name, user.id) + auth_client.cookies.remove_v2() + add_user_command_data.cookies = auth_client.cookies.to_dict() user_info: Optional[GenshinAccount] = None level: int = 0 # todo : 多账号绑定