mirror of
https://github.com/PaiGramTeam/PamGram.git
synced 2024-11-16 03:55:26 +00:00
✨ qlogin support get full token
This commit is contained in:
parent
ba53bcdb0a
commit
1fafbea483
@ -1,144 +0,0 @@
|
||||
import asyncio
|
||||
import json
|
||||
import random
|
||||
import qrcode
|
||||
|
||||
from io import BytesIO
|
||||
from string import ascii_letters, digits
|
||||
from typing import Dict, Union, Optional
|
||||
from httpx import AsyncClient
|
||||
from qrcode.image.pure import PyPNGImage
|
||||
|
||||
from ...logger import logger
|
||||
from ...models.genshin.cookies import CookiesModel
|
||||
from ...utility.devices import devices_methods
|
||||
from ...utility.helpers import get_device_id, get_ds
|
||||
|
||||
__all__ = ("AuthClient",)
|
||||
|
||||
|
||||
class AuthClient:
|
||||
player_id: Optional[int] = None
|
||||
user_id: Optional[int] = None
|
||||
cookies: Optional[CookiesModel] = None
|
||||
device_id: Optional[str] = None
|
||||
|
||||
USER_AGENT = (
|
||||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
|
||||
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15"
|
||||
)
|
||||
PASSPORT_HOST = "passport-api.mihoyo.com"
|
||||
HK4E_SDK_HOST = "hk4e-sdk.mihoyo.com"
|
||||
TAKUMI_HOST = "api-takumi.mihoyo.com"
|
||||
QRCODE_GEN_API = f"https://{HK4E_SDK_HOST}/hk4e_cn/combo/panda/qrcode/fetch"
|
||||
QRCODE_GET_API = f"https://{HK4E_SDK_HOST}/hk4e_cn/combo/panda/qrcode/query"
|
||||
GET_COOKIE_ACCOUNT_BY_GAME_TOKEN_API = f"https://{TAKUMI_HOST}/auth/api/getCookieAccountInfoByGameToken"
|
||||
GET_TOKEN_BY_GAME_LTOKEN_API = f"https://{PASSPORT_HOST}/account/ma-cn-session/app/getTokenByGameToken"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
player_id: Optional[int] = None,
|
||||
user_id: Optional[int] = None,
|
||||
cookies: Optional[Union[CookiesModel, dict]] = None,
|
||||
):
|
||||
self.client = AsyncClient()
|
||||
self.player_id = player_id
|
||||
if cookies is None:
|
||||
self.cookies = CookiesModel()
|
||||
else:
|
||||
if isinstance(cookies, dict):
|
||||
self.cookies = CookiesModel(**cookies)
|
||||
elif isinstance(cookies, CookiesModel):
|
||||
self.cookies = cookies
|
||||
else:
|
||||
raise RuntimeError
|
||||
if user_id:
|
||||
self.user_id = user_id
|
||||
else:
|
||||
self.user_id = self.cookies.user_id
|
||||
|
||||
async def get_ltoken_by_game_token(self, game_token: str) -> bool:
|
||||
if self.user_id is None:
|
||||
return False
|
||||
data = {"account_id": self.user_id, "game_token": game_token}
|
||||
headers = {
|
||||
"x-rpc-aigis": "",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
"x-rpc-game_biz": "bbs_cn",
|
||||
"x-rpc-sys_version": "11",
|
||||
"x-rpc-device_name": "Chrome 108.0.0.0",
|
||||
"x-rpc-device_model": "Windows 10 64-bit",
|
||||
"x-rpc-app_id": "bll8iq97cem8",
|
||||
"User-Agent": "okhttp/4.8.0",
|
||||
}
|
||||
await devices_methods.update_device_headers(self.user_id, headers)
|
||||
app_version, client_type, ds_sign = get_ds(new_ds=True, data=data)
|
||||
headers["x-rpc-app_version"] = app_version
|
||||
headers["x-rpc-client_type"] = client_type
|
||||
headers["DS"] = ds_sign
|
||||
res = await self.client.post(
|
||||
self.GET_TOKEN_BY_GAME_LTOKEN_API,
|
||||
headers=headers,
|
||||
json={"account_id": self.user_id, "game_token": game_token},
|
||||
)
|
||||
ltoken_data = res.json()
|
||||
self.cookies.ltmid_v2 = ltoken_data["data"]["user_info"]["mid"]
|
||||
self.cookies.ltoken_v2 = ltoken_data["data"]["token"]["token"]
|
||||
return True
|
||||
|
||||
async def create_qrcode_login(self) -> tuple[str, str]:
|
||||
self.device_id = get_device_id("".join(random.choices((ascii_letters + digits), k=64)))
|
||||
data = {"app_id": "8", "device": self.device_id}
|
||||
res = await self.client.post(self.QRCODE_GEN_API, json=data)
|
||||
res_json = res.json()
|
||||
url = res_json.get("data", {}).get("url", "")
|
||||
if not url:
|
||||
return "", ""
|
||||
ticket = url.split("ticket=")[1]
|
||||
return url, ticket
|
||||
|
||||
async def _get_cookie_token_data(self, game_token: str, account_id: int) -> Dict:
|
||||
res = await self.client.get(
|
||||
self.GET_COOKIE_ACCOUNT_BY_GAME_TOKEN_API,
|
||||
params={"game_token": game_token, "account_id": account_id},
|
||||
)
|
||||
return res.json()
|
||||
|
||||
async def _set_cookie_by_game_token(self, data: Dict) -> bool:
|
||||
game_token = json.loads(data.get("payload", {}).get("raw", "{}"))
|
||||
if not game_token:
|
||||
return False
|
||||
uid = game_token["uid"]
|
||||
self.user_id = int(uid)
|
||||
cookie_token_data = await self._get_cookie_token_data(game_token["token"], self.user_id)
|
||||
await self.get_ltoken_by_game_token(game_token["token"])
|
||||
cookie_token = cookie_token_data["data"]["cookie_token"]
|
||||
self.cookies.cookie_token = cookie_token
|
||||
self.cookies.account_id = game_token["uid"]
|
||||
return True
|
||||
|
||||
async def check_qrcode_login(self, ticket: str):
|
||||
data = {"app_id": "8", "ticket": ticket, "device": self.device_id}
|
||||
for _ in range(20):
|
||||
await asyncio.sleep(10)
|
||||
res = await self.client.post(self.QRCODE_GET_API, json=data)
|
||||
res_json = res.json()
|
||||
ret_code = res_json.get("retcode", 1)
|
||||
if ret_code != 0:
|
||||
logger.debug("QRCODE_GET_API: [%s]%s", res_json.get("retcode"), res_json.get("message"))
|
||||
return False
|
||||
logger.debug("QRCODE_GET_API: %s", res_json.get("data"))
|
||||
res_data = res_json.get("data", {})
|
||||
if res_data.get("stat", "") == "Confirmed":
|
||||
return await self._set_cookie_by_game_token(res_json.get("data", {}))
|
||||
|
||||
@staticmethod
|
||||
def generate_qrcode(url: str) -> bytes:
|
||||
qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4)
|
||||
qr.add_data(url)
|
||||
qr.make(fit=True)
|
||||
img = qr.make_image(image_factory=PyPNGImage, fill_color="black", back_color="white")
|
||||
bio = BytesIO()
|
||||
img.save(bio)
|
||||
return bio.getvalue()
|
@ -13,6 +13,7 @@ class CookiesModel(BaseModel):
|
||||
|
||||
stoken: Optional[str] = None
|
||||
stuid: Optional[IntStr] = None
|
||||
mid: Optional[str] = None
|
||||
|
||||
account_id: Optional[IntStr] = None
|
||||
cookie_token: Optional[str] = None
|
||||
|
@ -1,7 +1,11 @@
|
||||
import asyncio
|
||||
from datetime import datetime
|
||||
from io import BytesIO
|
||||
from typing import Dict, Optional
|
||||
|
||||
import qrcode
|
||||
from arkowrapper import ArkoWrapper
|
||||
from qrcode.image.pure import PyPNGImage
|
||||
from simnet import StarRailClient, Region
|
||||
from simnet.errors import DataNotPublic, InvalidCookies, BadRequest as SimnetBadRequest
|
||||
from simnet.models.lab.record import Account
|
||||
@ -16,7 +20,6 @@ from core.services.cookies.models import CookiesDataBase as Cookies, CookiesStat
|
||||
from core.services.cookies.services import CookiesService
|
||||
from core.services.players.models import PlayersDataBase as Player, PlayerInfoSQLModel
|
||||
from core.services.players.services import PlayersService, PlayerInfoService
|
||||
from modules.apihelper.client.components.authclient import AuthClient
|
||||
from modules.apihelper.models.genshin.cookies import CookiesModel
|
||||
from utils.log import logger
|
||||
|
||||
@ -93,6 +96,16 @@ class AccountCookiesPlugin(Plugin.Conversation):
|
||||
await message.reply_markdown_v2(text, reply_markup=ReplyKeyboardMarkup(reply_keyboard, one_time_keyboard=True))
|
||||
return CHECK_SERVER
|
||||
|
||||
@staticmethod
|
||||
def generate_qrcode(url: str) -> bytes:
|
||||
qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4)
|
||||
qr.add_data(url)
|
||||
qr.make(fit=True)
|
||||
img = qr.make_image(image_factory=PyPNGImage, fill_color="black", back_color="white")
|
||||
bio = BytesIO()
|
||||
img.save(bio)
|
||||
return bio.getvalue()
|
||||
|
||||
@conversation.entry_point
|
||||
@handler.command("qlogin", filters=filters.ChatType.PRIVATE, block=False)
|
||||
async def qrcode_login(self, update: Update, context: CallbackContext):
|
||||
@ -106,14 +119,25 @@ class AccountCookiesPlugin(Plugin.Conversation):
|
||||
else:
|
||||
account_cookies_plugin_data.reset()
|
||||
account_cookies_plugin_data.region = RegionEnum.HYPERION
|
||||
auth_client = AuthClient()
|
||||
url, ticket = await auth_client.create_qrcode_login()
|
||||
data = auth_client.generate_qrcode(url)
|
||||
text = f"你好 {user.mention_html()} !该绑定方法仅支持国服,请在3分钟内使用米游社扫码并确认进行绑定。"
|
||||
await message.reply_photo(data, caption=text, parse_mode=ParseMode.HTML)
|
||||
if await auth_client.check_qrcode_login(ticket):
|
||||
account_cookies_plugin_data.cookies = auth_client.cookies.to_dict()
|
||||
return await self.check_cookies(update, context)
|
||||
async with StarRailClient(region=Region.CHINESE) as client:
|
||||
url, ticket = await client.gen_login_qrcode()
|
||||
data = self.generate_qrcode(url)
|
||||
text = f"你好 {user.mention_html()} !该绑定方法仅支持国服,请在3分钟内使用米游社扫码并确认进行绑定。"
|
||||
await message.reply_photo(data, caption=text, parse_mode=ParseMode.HTML)
|
||||
for _ in range(20):
|
||||
await asyncio.sleep(10)
|
||||
try:
|
||||
if game_token := await client.check_login_qrcode(ticket):
|
||||
cookies = {"stuid": str(client.account_id), "ltuid": str(client.account_id)}
|
||||
cookies["stoken"], cookies["mid"] = await client.get_stoken_v2_and_mid_by_game_token(game_token)
|
||||
cookies["cookie_token"] = await client.get_cookie_token_by_stoken()
|
||||
cookies["ltoken"] = await client.get_ltoken_by_stoken()
|
||||
account_cookies_plugin_data.cookies = cookies
|
||||
return await self.check_cookies(update, context)
|
||||
except SimnetBadRequest as e:
|
||||
if e.ret_code == -106:
|
||||
break
|
||||
raise e
|
||||
await message.reply_markdown_v2("可能是验证码已过期或者你没有同意授权,请重新发送命令进行绑定。")
|
||||
return ConversationHandler.END
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user