♻️ Refactor AuthClient

This commit is contained in:
洛水居室 2023-02-21 17:43:40 +08:00
parent f67eb74c04
commit 876bc6c6bc
No known key found for this signature in database
GPG Key ID: C9DE87DA724B88FC
4 changed files with 307 additions and 190 deletions

View File

@ -0,0 +1,227 @@
import asyncio
import json
import random
import qrcode
from io import BytesIO
from string import ascii_letters, digits
from typing import Dict, Union, Optional, Tuple, Any
from httpx import AsyncClient
from ...logger import logger
from ...models.genshin.cookies import CookiesModel
from ...utility.helpers import get_device_id, get_ds
__all__ = ("AuthClient",)
class AuthClient:
player_id: Optional[int] = None
user_id: Optional[int] = None
cookies: Optional[CookiesModel] = None
device_id: Optional[str] = None
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15"
)
PASSPORT_HOST = "passport-api.mihoyo.com"
HK4E_SDK_HOST = "hk4e-sdk.mihoyo.com"
TAKUMI_HOST = "api-takumi.mihoyo.com"
QRCODE_GEN_API = f"https://{HK4E_SDK_HOST}/hk4e_cn/combo/panda/qrcode/fetch"
QRCODE_GET_API = f"https://{HK4E_SDK_HOST}/hk4e_cn/combo/panda/qrcode/query"
GET_COOKIE_ACCOUNT_BY_GAME_TOKEN_API = f"https://{TAKUMI_HOST}/auth/api/getCookieAccountInfoByGameToken"
GET_TOKEN_BY_GAME_LTOKEN_API = f"https://{PASSPORT_HOST}/account/ma-cn-session/app/getTokenByGameToken"
GET_COOKIES_TOKEN_BY_STOKEN_API = f"https://{PASSPORT_HOST}/account/auth/api/getCookieAccountInfoBySToken"
GET_LTOKEN_BY_STOKEN_API = f"https://{PASSPORT_HOST}/account/auth/api/getLTokenBySToken"
get_STOKEN_URL = f"https://{TAKUMI_HOST}/auth/api/getMultiTokenByLoginTicket"
def __init__(
self,
player_id: Optional[int] = None,
user_id: Optional[int] = None,
cookies: Optional[Union[CookiesModel, dict]] = None,
):
self.client = AsyncClient()
self.player_id = player_id
if cookies is None:
self.cookies = CookiesModel()
else:
if isinstance(cookies, dict):
self.cookies = CookiesModel(**cookies)
elif isinstance(cookies, CookiesModel):
self.cookies = cookies
else:
raise RuntimeError
if user_id:
self.user_id = user_id
else:
self.user_id = self.cookies.user_id
async def get_stoken_by_login_ticket(self) -> bool:
if self.cookies.login_ticket is None and self.user_id is None:
return False
params = {"login_ticket": self.cookies.login_ticket, "uid": self.user_id, "token_types": 3}
data = await self.client.get(self.get_STOKEN_URL, params=params, headers={"User-Agent": self.USER_AGENT})
res_json = data.json()
res_data = res_json.get("data", {}).get("list", [])
for i in res_data:
name = i.get("name")
token = i.get("token")
if name and token:
if hasattr(self.cookies, name):
setattr(self.cookies, name, token)
if self.cookies.stoken:
if self.cookies.stuid:
self.cookies.stuid = self.user_id
return True
return False
async def get_ltoken_by_game_token(self, game_token: str) -> bool:
if self.user_id is None:
return False
data = {"account_id": self.user_id, "game_token": game_token}
headers = {
"x-rpc-aigis": "",
"Content-Type": "application/json",
"Accept": "application/json",
"x-rpc-game_biz": "bbs_cn",
"x-rpc-sys_version": "11",
"x-rpc-device_id": get_device_id(self.USER_AGENT),
"x-rpc-device_fp": "".join(random.choices((ascii_letters + digits), k=13)),
"x-rpc-device_name": "Chrome 108.0.0.0",
"x-rpc-device_model": "Windows 10 64-bit",
"x-rpc-app_id": "bll8iq97cem8",
"User-Agent": "okhttp/4.8.0",
}
app_version, client_type, ds_sign = get_ds(new_ds=True, data=data)
headers["x-rpc-app_version"] = app_version
headers["x-rpc-client_type"] = client_type
headers["DS"] = ds_sign
res = await self.client.post(
self.GET_TOKEN_BY_GAME_LTOKEN_API,
headers=headers,
json={"account_id": self.user_id, "game_token": game_token},
)
ltoken_data = res.json()
self.cookies.ltmid_v2 = ltoken_data["data"]["user_info"]["mid"]
self.cookies.ltoken_v2 = ltoken_data["data"]["token"]["token"]
return True
async def create_qrcode_login(self) -> tuple[str, str]:
self.device_id = get_device_id("".join(random.choices((ascii_letters + digits), k=64)))
data = {"app_id": "8", "device": self.device_id}
res = await self.client.post(self.QRCODE_GEN_API, json=data)
res_json = res.json()
url = res_json.get("data", {}).get("url", "")
if not url:
return "", ""
ticket = url.split("ticket=")[1]
return url, ticket
async def _get_cookie_token_data(self, game_token: str, account_id: int) -> Dict:
res = await self.client.get(
self.GET_COOKIE_ACCOUNT_BY_GAME_TOKEN_API,
params={"game_token": game_token, "account_id": account_id},
)
return res.json()
async def _set_cookie_by_game_token(self, data: Dict) -> bool:
game_token = json.loads(data.get("payload", {}).get("raw", "{}"))
if not game_token:
return False
uid = game_token["uid"]
self.user_id = int(uid)
cookie_token_data = await self._get_cookie_token_data(game_token["token"], self.user_id)
await self.get_ltoken_by_game_token(game_token["token"])
cookie_token = cookie_token_data["data"]["cookie_token"]
self.cookies.cookie_token = cookie_token
self.cookies.account_id = game_token["uid"]
return True
async def check_qrcode_login(self, ticket: str):
data = {"app_id": "8", "ticket": ticket, "device": self.device_id}
for _ in range(20):
await asyncio.sleep(10)
res = await self.client.post(self.QRCODE_GET_API, json=data)
res_json = res.json()
ret_code = res_json.get("retcode", 1)
if ret_code != 0:
logger.debug("QRCODE_GET_API: [%s]%s", res_json.get("retcode"), res_json.get("message"))
return False
logger.debug("QRCODE_GET_API: %s", res_json.get("data"))
res_data = res_json.get("data", {})
if res_data.get("stat", "") == "Confirmed":
return await self._set_cookie_by_game_token(res_json.get("data", {}))
async def get_cookie_token_by_stoken(self) -> bool:
if self.cookies.stoken is None:
return False
user_id = self.cookies.user_id
headers = {
"x-rpc-app_version": "2.11.1",
"User-Agent": (
"Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1"
),
"x-rpc-client_type": "5",
"Referer": "https://webstatic.mihoyo.com/",
"Origin": "https://webstatic.mihoyo.com",
}
params = {
"stoken": self.cookies.stoken,
"uid": user_id,
}
res = await self.client.get(
self.GET_COOKIES_TOKEN_BY_STOKEN_API,
headers=headers,
params=params,
)
res_json = res.json()
cookie_token = res_json.get("data", {}).get("cookie_token", "")
if cookie_token:
self.cookies.cookie_token = cookie_token
self.cookies.account_id = user_id
return True
return False
async def get_ltoken_by_stoken(self) -> bool:
if self.cookies.stoken is None:
return False
user_id = self.cookies.user_id
headers = {
"x-rpc-app_version": "2.11.1",
"User-Agent": (
"Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1"
),
"x-rpc-client_type": "5",
"Referer": "https://webstatic.mihoyo.com/",
"Origin": "https://webstatic.mihoyo.com",
}
params = {
"stoken": self.cookies.stoken,
"uid": user_id,
}
res = await self.client.get(
self.GET_LTOKEN_BY_STOKEN_API,
headers=headers,
params=params,
)
res_json = res.json()
ltoken = res_json.get("data", {}).get("ltoken", "")
if ltoken:
self.cookies.ltoken = ltoken
self.cookies.ltuid = user_id
return True
return False
@staticmethod
def generate_qrcode(url: str) -> bytes:
qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4)
qr.add_data(url)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
bio = BytesIO()
img.save(bio, format="PNG")
return bio.getvalue()

View File

@ -1,172 +0,0 @@
import asyncio
import json
import random
import qrcode
from io import BytesIO
from string import ascii_letters, digits
from typing import Dict
from httpx import AsyncClient
from ...logger import logger
from ...utility.helpers import get_device_id, get_ds
__all__ = ("SignIn",)
class SignIn:
S_TOKEN_URL = (
"https://api-takumi.mihoyo.com/auth/api/getMultiTokenByLoginTicket?login_ticket={0}&token_types=3&uid={1}"
)
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15"
)
QRCODE_GEN_API = "https://hk4e-sdk.mihoyo.com/hk4e_cn/combo/panda/qrcode/fetch"
QRCODE_GET_API = "https://hk4e-sdk.mihoyo.com/hk4e_cn/combo/panda/qrcode/query"
GAME_TOKEN_API = "https://api-takumi.mihoyo.com/auth/api/getCookieAccountInfoByGameToken"
GAME_LTOKEN_API = "https://passport-api.mihoyo.com/account/ma-cn-session/app/getTokenByGameToken"
COOKIES_GET_API = "https://passport-api.mihoyo.com/account/auth/api/getCookieAccountInfoBySToken"
def __init__(self, uid: int = 0, cookie: Dict = None):
self.client = AsyncClient()
self.uid = uid
self.cookie = cookie.copy() if cookie is not None else {}
self.parse_uid()
self.ticket = None
self.device_id = None
def parse_uid(self):
"""
从cookie中获取uid
:param self:
:return:
"""
if not self.cookie:
return
for item in ["login_uid", "stuid", "ltuid", "account_id"]:
if item in self.cookie:
self.uid = self.cookie[item]
break
for item in ["login_uid", "stuid", "ltuid", "account_id"]:
self.cookie[item] = self.uid
async def get_s_token(self):
if not self.cookie.get("login_ticket") or not self.uid:
return
data = await self.client.get(
self.S_TOKEN_URL.format(self.cookie["login_ticket"], self.uid), headers={"User-Agent": self.USER_AGENT}
)
res_json = data.json()
res_data = res_json.get("data", {}).get("list", [])
for i in res_data:
if i.get("name") and i.get("token"):
self.cookie[i.get("name")] = i.get("token")
async def get_ltoken_by_game_token(self, game_token: str):
data = {"account_id": self.uid, "game_token": game_token}
headers = {
"x-rpc-aigis": "",
"Content-Type": "application/json",
"Accept": "application/json",
"x-rpc-game_biz": "bbs_cn",
"x-rpc-sys_version": "11",
"x-rpc-device_id": get_device_id(self.USER_AGENT),
"x-rpc-device_fp": "".join(random.choices((ascii_letters + digits), k=13)),
"x-rpc-device_name": "Chrome 108.0.0.0",
"x-rpc-device_model": "Windows 10 64-bit",
"x-rpc-app_id": "bll8iq97cem8",
"User-Agent": "okhttp/4.8.0",
}
app_version, client_type, ds_sign = get_ds(new_ds=True, data=data)
headers["x-rpc-app_version"] = app_version
headers["x-rpc-client_type"] = client_type
headers["DS"] = ds_sign
res = await self.client.post(
self.GAME_LTOKEN_API,
headers=headers,
json={"account_id": self.uid, "game_token": game_token},
)
return res.json()
async def create_login_data(self) -> str:
self.device_id = get_device_id("".join(random.choices((ascii_letters + digits), k=64)))
data = {"app_id": "8", "device": self.device_id}
res = await self.client.post(self.QRCODE_GEN_API, json=data)
res_json = res.json()
url = res_json.get("data", {}).get("url", "")
if not url:
return ""
self.ticket = url.split("ticket=")[1]
return url
async def get_cookie_token_data(self, game_token: str) -> Dict:
res = await self.client.get(
self.GAME_TOKEN_API,
params={"game_token": game_token, "account_id": self.uid},
)
return res.json()
async def set_cookie(self, data: Dict) -> bool:
self.cookie = {}
game_token = json.loads(data.get("payload", {}).get("raw", "{}"))
if not game_token:
return False
self.uid = int(game_token["uid"])
for item in ["login_uid", "stuid", "ltuid", "account_id"]:
self.cookie[item] = str(self.uid)
cookie_token_data = await self.get_cookie_token_data(game_token["token"])
ltoken_data = await self.get_ltoken_by_game_token(game_token["token"])
self.cookie["cookie_token"] = cookie_token_data["data"]["cookie_token"]
for item in ["account_mid_v2", "ltmid_v2"]:
self.cookie[item] = ltoken_data["data"]["user_info"]["mid"]
self.cookie["ltoken_v2"] = ltoken_data["data"]["token"]["token"]
return True
async def check_login(self):
data = {"app_id": "8", "ticket": self.ticket, "device": self.device_id}
for _ in range(20):
await asyncio.sleep(10)
res = await self.client.post(self.QRCODE_GET_API, json=data)
res_json = res.json()
ret_code = res_json.get("retcode", 1)
if ret_code != 0:
logger.debug("QRCODE_GET_API: [%s]%s", res_json.get("retcode"), res_json.get("message"))
return False
logger.debug("QRCODE_GET_API: %s", res_json.get("data"))
res_data = res_json.get("data", {})
if res_data.get("stat", "") == "Confirmed":
return await self.set_cookie(res_json.get("data", {}))
async def get_cookie_account_info_by_stoken(self, stoken, uid):
headers = {
"x-rpc-app_version": "2.11.1",
"User-Agent": (
"Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1"
),
"x-rpc-client_type": "5",
"Referer": "https://webstatic.mihoyo.com/",
"Origin": "https://webstatic.mihoyo.com",
}
params = {
"stoken": stoken,
"uid": uid,
}
res = await self.client.get(
self.COOKIES_GET_API,
headers=headers,
params=params,
)
res_json = res.json()
return res_json.get("data", {}).get("cookie_token", "")
@staticmethod
def generate_qrcode(url: str) -> bytes:
qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4)
qr.add_data(url)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
bio = BytesIO()
img.save(bio, format="PNG")
return bio.getvalue()

View File

@ -0,0 +1,63 @@
from typing import Optional, TypeVar
from pydantic import BaseModel
IntStr = TypeVar("IntStr", int, str)
__all__ = ("CookiesModel",)
class CookiesModel(BaseModel):
login_uid: Optional[IntStr] = None
login_ticket: Optional[str] = None
stoken: Optional[str] = None
stuid: Optional[IntStr] = None
account_id: Optional[IntStr] = None
cookie_token: Optional[str] = None
ltoken: Optional[str] = None
ltuid: Optional[IntStr] = None
account_mid_v2: Optional[str] = None
cookie_token_v2: Optional[str] = None
ltoken_v2: Optional[str] = None
ltmid_v2: Optional[str] = None
@property
def is_v1(self) -> bool:
if self.account_id or self.cookie_token or self.ltoken or self.ltuid:
return True
return False
@property
def is_v2(self) -> bool:
if self.account_mid_v2 or self.cookie_token_v2 or self.ltoken_v2 or self.ltmid_v2:
return True
return False
def remove_v2(self):
self.account_mid_v2 = None
self.cookie_token_v2 = None
self.ltoken_v2 = None
self.ltmid_v2 = None
def to_dict(self):
return self.dict(exclude_defaults=True)
def to_json(self):
return self.json(exclude_defaults=True)
@property
def user_id(self) -> Optional[int]:
if self.ltuid:
return self.ltuid
if self.account_id:
return self.account_id
if self.login_uid:
return self.login_uid
if self.stuid:
return self.stuid
return None

View File

@ -12,13 +12,12 @@ from telegram.helpers import escape_markdown
from core.baseplugin import BasePlugin
from core.cookies.error import CookiesNotFoundError
from core.cookies.models import Cookies
from core.cookies.services import CookiesService
from core.plugin import Plugin, conversation, handler
from core.user.error import UserNotFoundError
from core.user.models import User
from core.user.services import UserService
from modules.apihelper.client.components.signin import SignIn
from modules.apihelper.client.components.authclient import AuthClient
from utils.decorators.error import error_callable
from utils.decorators.restricts import restricts
from utils.log import logger
@ -31,7 +30,6 @@ class AddUserCommandData(TelegramObject):
cookies: dict = {}
game_uid: int = 0
phone: int = 0
sign_in_client: Optional[SignIn] = None
CHECK_SERVER, INPUT_COOKIES, COMMAND_RESULT = range(10100, 10103)
@ -104,13 +102,13 @@ class SetUserCookies(Plugin.Conversation, BasePlugin.Conversation):
else:
await message.reply_text("警告你已经绑定Cookie如果继续操作会覆盖当前Cookie。")
add_user_command_data.user = user_info
sign_in_client = SignIn()
url = await sign_in_client.create_login_data()
data = sign_in_client.generate_qrcode(url)
auth_client = AuthClient()
url, ticket = await auth_client.create_qrcode_login()
data = auth_client.generate_qrcode(url)
text = f"你好 {user.mention_html()} 该绑定方法仅支持国服请在3分钟内使用米游社扫码并确认进行绑定。"
await message.reply_photo(data, caption=text, parse_mode=ParseMode.HTML)
if await sign_in_client.check_login():
add_user_command_data.cookies = sign_in_client.cookie
if await auth_client.check_qrcode_login(ticket):
add_user_command_data.cookies = auth_client.cookies.to_dict()
return await self.check_cookies(update, context)
else:
await message.reply_markdown_v2("可能是验证码已过期或者你没有同意授权,请重新发送命令进行绑定。")
@ -272,16 +270,17 @@ class SetUserCookies(Plugin.Conversation, BasePlugin.Conversation):
return ConversationHandler.END
with contextlib.suppress(Exception):
if cookies.get("login_ticket"):
sign_in_client = SignIn(cookie=add_user_command_data.cookies)
await sign_in_client.get_s_token()
add_user_command_data.cookies = sign_in_client.cookie
logger.success("用户 %s[%s] 绑定时获取 stoken 成功", user.full_name, user.id)
stoken = add_user_command_data.cookies.get("stoken")
account_id = add_user_command_data.cookies.get("account_id")
if stoken and account_id:
cookie_token = await sign_in_client.get_cookie_account_info_by_stoken(stoken, account_id)
add_user_command_data.cookies["cookie_token"] = cookie_token
logger.success("用户 %s[%s] 绑定时获取 cookie_token 成功", user.full_name, user.id)
auth_client = AuthClient(cookies=add_user_command_data.cookies)
if await auth_client.get_stoken_by_login_ticket():
logger.success("用户 %s[%s] 绑定时获取 stoken 成功", user.full_name, user.id)
add_user_command_data.cookies = auth_client.cookies.to_dict()
if await auth_client.get_cookie_token_by_stoken():
logger.success("用户 %s[%s] 绑定时获取 cookie_token 成功", user.full_name, user.id)
add_user_command_data.cookies = auth_client.cookies.to_dict()
if await auth_client.get_ltoken_by_stoken():
logger.success("用户 %s[%s] 绑定时获取 ltoken 成功", user.full_name, user.id)
auth_client.cookies.remove_v2()
add_user_command_data.cookies = auth_client.cookies.to_dict()
user_info: Optional[GenshinAccount] = None
level: int = 0
# todo : 多账号绑定