mirror of
https://github.com/0-8-4/miui-auto-tasks.git
synced 2024-11-24 17:19:41 +00:00
6ade926505
* add missing dependency: apscheduler fix some bugs in Dockerfile * 🔧 自动更新requirements * fix: 解决某些情况下不能正确生成、读取配置文件 update: 将python版本升级到3.12,迁移pydantic v2依赖 修改Dockerfile * 🔧 自动更新requirements * Pylint * Hadolint - DL3042 https://github.com/hadolint/hadolint/wiki/DL3042 --------- Co-authored-by: JaHIY <jaklsy@gmail.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Night-stars-1 <99261160+Night-stars-1@users.noreply.github.com>
242 lines
8.9 KiB
Python
242 lines
8.9 KiB
Python
"""工具类"""
|
||
import base64
|
||
import random
|
||
import time
|
||
from typing import Type
|
||
from urllib.parse import parse_qsl, urlparse
|
||
|
||
from cryptography.hazmat.backends import default_backend
|
||
from cryptography.hazmat.primitives import padding, serialization
|
||
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
|
||
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||
from pydantic import ValidationError
|
||
from tenacity import RetryError, Retrying, stop_after_attempt
|
||
|
||
from .captcha import get_validate
|
||
from .data_model import TokenResultHandler
|
||
from .logger import log
|
||
from .request import post
|
||
|
||
PUBLIC_KEY_PEM = '''-----BEGIN PUBLIC KEY-----
|
||
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArxfNLkuAQ/BYHzkzVwtu
|
||
g+0abmYRBVCEScSzGxJIOsfxVzcuqaKO87H2o2wBcacD3bRHhMjTkhSEqxPjQ/FE
|
||
XuJ1cdbmr3+b3EQR6wf/cYcMx2468/QyVoQ7BADLSPecQhtgGOllkC+cLYN6Md34
|
||
Uii6U+VJf0p0q/saxUTZvhR2ka9fqJ4+6C6cOghIecjMYQNHIaNW+eSKunfFsXVU
|
||
+QfMD0q2EM9wo20aLnos24yDzRjh9HJc6xfr37jRlv1/boG/EABMG9FnTm35xWrV
|
||
R0nw3cpYF7GZg13QicS/ZwEsSd4HyboAruMxJBPvK3Jdr4ZS23bpN0cavWOJsBqZ
|
||
VwIDAQAB
|
||
-----END PUBLIC KEY-----'''
|
||
|
||
headers = {
|
||
'Accept': '*/*',
|
||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
|
||
'Cache-Control': 'no-cache',
|
||
'Connection': 'keep-alive',
|
||
'Content-type': 'application/x-www-form-urlencoded',
|
||
'Origin': 'https://web.vip.miui.com',
|
||
'Pragma': 'no-cache',
|
||
'Referer': 'https://web.vip.miui.com/',
|
||
'Sec-Fetch-Dest': 'empty',
|
||
'Sec-Fetch-Mode': 'cors',
|
||
'Sec-Fetch-Site': 'cross-site',
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
|
||
'sec-ch-ua': '"Microsoft Edge";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
|
||
'sec-ch-ua-mobile': '?0',
|
||
'sec-ch-ua-platform': '"Windows"',
|
||
}
|
||
|
||
|
||
def get_random_chars_as_string(count: int) -> str:
|
||
"""获取随机字符串"""
|
||
characters = list('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890!@#,%^&*()-=_+~`{}[]|:<>.?/')
|
||
selected_chars = random.sample(characters, count)
|
||
return ''.join(selected_chars)
|
||
|
||
|
||
def aes_encrypt(key: str, data: str) -> base64:
|
||
"""AES加密"""
|
||
iv = b'0102030405060708' # pylint: disable=invalid-name
|
||
cipher = Cipher(algorithms.AES(key.encode('utf-8')), modes.CBC(iv), backend=default_backend())
|
||
encryptor = cipher.encryptor()
|
||
padder = padding.PKCS7(algorithms.AES.block_size).padder()
|
||
padded_data = padder.update(data.encode('utf-8')) + padder.finalize()
|
||
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
|
||
return base64.b64encode(ciphertext).decode('utf-8')
|
||
|
||
|
||
def rsa_encrypt(public_key_pem: str, data: str) -> base64:
|
||
"""RSA加密"""
|
||
public_key = serialization.load_pem_public_key(
|
||
public_key_pem.encode('utf-8'),
|
||
backend=default_backend()
|
||
)
|
||
encoded_data = base64.b64encode(data.encode('utf-8'))
|
||
ciphertext = public_key.encrypt(
|
||
encoded_data,
|
||
PKCS1v15()
|
||
)
|
||
|
||
return base64.b64encode(ciphertext).decode('utf-8')
|
||
|
||
|
||
IncorrectReturn = (KeyError, TypeError, AttributeError, IndexError, ValidationError)
|
||
"""API返回数据无效会触发的异常组合"""
|
||
|
||
|
||
def is_incorrect_return(exception: Exception, *addition_exceptions: Type[Exception]) -> bool:
|
||
"""
|
||
判断是否是API返回数据无效的异常
|
||
:param exception: 异常对象
|
||
:param addition_exceptions: 额外的异常类型,用于触发判断
|
||
"""
|
||
exceptions = IncorrectReturn + addition_exceptions
|
||
return isinstance(exception, exceptions) or isinstance(exception.__cause__, exceptions)
|
||
|
||
|
||
async def get_token_by_captcha(url: str) -> str | bool:
|
||
"""通过人机验证码获取TOKEN"""
|
||
try:
|
||
parsed_url = urlparse(url)
|
||
query_params = dict(parse_qsl(parsed_url.query)) # 解析URL参数
|
||
gt = query_params.get("c") # pylint: disable=invalid-name
|
||
challenge = query_params.get("l")
|
||
geetest_data = await get_validate(gt, challenge)
|
||
params = {
|
||
'k': '3dc42a135a8d45118034d1ab68213073',
|
||
'locale': 'zh_CN',
|
||
'_t': round(time.time() * 1000),
|
||
}
|
||
|
||
data = {
|
||
'e': query_params.get("e"), # 人机验证的e参数,来自URL
|
||
'challenge': geetest_data.challenge,
|
||
'seccode': f'{geetest_data.validate}|jordan',
|
||
}
|
||
|
||
response = await post('https://verify.sec.xiaomi.com/captcha/v2/gt/dk/verify', params=params, headers=headers,
|
||
data=data)
|
||
log.debug(response.text)
|
||
result = response.json()
|
||
api_data = TokenResultHandler(result)
|
||
if api_data.success:
|
||
return api_data.token
|
||
elif not api_data.data.get("result"):
|
||
log.error("遇到人机验证码,无法获取TOKEN")
|
||
return False
|
||
else:
|
||
log.error("遇到未知错误,无法获取TOKEN")
|
||
return False
|
||
except Exception: # pylint: disable=broad-exception-caught
|
||
log.exception("获取TOKEN异常")
|
||
return False
|
||
|
||
|
||
# pylint: disable=trailing-whitespace
|
||
async def get_token(uid: str) -> str | bool:
|
||
"""获取TOKEN"""
|
||
try:
|
||
for attempt in Retrying(stop=stop_after_attempt(3)):
|
||
with attempt:
|
||
data = {
|
||
"type": 0,
|
||
"startTs": round(time.time() * 1000),
|
||
"endTs": round(time.time() * 1000),
|
||
"env": {
|
||
"p1": "",
|
||
"p2": "",
|
||
"p3": "",
|
||
"p4": "",
|
||
"p5": "",
|
||
"p6": "",
|
||
"p7": "",
|
||
"p8": "",
|
||
"p9": "",
|
||
"p10": "",
|
||
"p11": "",
|
||
"p12": "",
|
||
"p13": "",
|
||
"p14": "",
|
||
"p15": "",
|
||
"p16": "",
|
||
"p17": "",
|
||
"p18": "",
|
||
"p19": "",
|
||
"p20": "",
|
||
"p21": "",
|
||
"p22": "",
|
||
"p23": "",
|
||
"p24": "",
|
||
"p25": "",
|
||
"p26": "",
|
||
"p28": "",
|
||
"p29": "",
|
||
"p30": "",
|
||
"p31": "",
|
||
"p32": "",
|
||
"p33": [],
|
||
"p34": ""
|
||
},
|
||
"action": {
|
||
"a1": [],
|
||
"a2": [],
|
||
"a3": [],
|
||
"a4": [],
|
||
"a5": [],
|
||
"a6": [],
|
||
"a7": [],
|
||
"a8": [],
|
||
"a9": [],
|
||
"a10": [],
|
||
"a11": [],
|
||
"a12": [],
|
||
"a13": [],
|
||
"a14": []
|
||
},
|
||
"force": False,
|
||
"talkBack": False,
|
||
"uid": uid,
|
||
"nonce": {
|
||
"t": round(time.time()),
|
||
"r": round(time.time())
|
||
},
|
||
"version": "2.0",
|
||
"scene": "GROW_UP_CHECKIN"
|
||
}
|
||
|
||
key = get_random_chars_as_string(16)
|
||
|
||
params = {
|
||
'k': '3dc42a135a8d45118034d1ab68213073',
|
||
'locale': 'zh_CN',
|
||
'_t': round(time.time() * 1000),
|
||
}
|
||
|
||
data = {
|
||
's': rsa_encrypt(PUBLIC_KEY_PEM, key),
|
||
'd': aes_encrypt(key, str(data)),
|
||
'a': 'GROW_UP_CHECKIN',
|
||
}
|
||
response = await post('https://verify.sec.xiaomi.com/captcha/v2/data', params=params, headers=headers,
|
||
data=data)
|
||
log.debug(response.text)
|
||
result = response.json()
|
||
api_data = TokenResultHandler(result)
|
||
if api_data.success:
|
||
return api_data.token
|
||
elif api_data.need_verify:
|
||
log.error("遇到人机验证码, 尝试调用解决方案")
|
||
url = api_data.data.get("url")
|
||
if token := await get_token_by_captcha(url):
|
||
return token
|
||
else:
|
||
raise ValueError("人机验证失败")
|
||
else:
|
||
log.error("遇到未知错误,无法获取TOKEN")
|
||
return False
|
||
except RetryError as error:
|
||
if is_incorrect_return(error):
|
||
log.exception(f"TOKEN - 服务器没有正确返回 {response.text}")
|
||
else:
|
||
log.exception("获取TOKEN异常")
|
||
return False
|