miui-auto-tasks/utils/utils.py
Xiwangly 6ade926505
迁移依赖&修复bug (#190)
* add missing dependency: apscheduler
fix some bugs in Dockerfile

* 🔧 自动更新requirements

* fix:
解决某些情况下不能正确生成、读取配置文件
update:
将python版本升级到3.12,迁移pydantic v2依赖
修改Dockerfile

* 🔧 自动更新requirements

* Pylint

* Hadolint - DL3042

https://github.com/hadolint/hadolint/wiki/DL3042

---------

Co-authored-by: JaHIY <jaklsy@gmail.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Night-stars-1 <99261160+Night-stars-1@users.noreply.github.com>
2023-11-22 12:02:10 +08:00

242 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""工具类"""
import base64
import random
import time
from typing import Type
from urllib.parse import parse_qsl, urlparse
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding, serialization
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from pydantic import ValidationError
from tenacity import RetryError, Retrying, stop_after_attempt
from .captcha import get_validate
from .data_model import TokenResultHandler
from .logger import log
from .request import post
PUBLIC_KEY_PEM = '''-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArxfNLkuAQ/BYHzkzVwtu
g+0abmYRBVCEScSzGxJIOsfxVzcuqaKO87H2o2wBcacD3bRHhMjTkhSEqxPjQ/FE
XuJ1cdbmr3+b3EQR6wf/cYcMx2468/QyVoQ7BADLSPecQhtgGOllkC+cLYN6Md34
Uii6U+VJf0p0q/saxUTZvhR2ka9fqJ4+6C6cOghIecjMYQNHIaNW+eSKunfFsXVU
+QfMD0q2EM9wo20aLnos24yDzRjh9HJc6xfr37jRlv1/boG/EABMG9FnTm35xWrV
R0nw3cpYF7GZg13QicS/ZwEsSd4HyboAruMxJBPvK3Jdr4ZS23bpN0cavWOJsBqZ
VwIDAQAB
-----END PUBLIC KEY-----'''
headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Content-type': 'application/x-www-form-urlencoded',
'Origin': 'https://web.vip.miui.com',
'Pragma': 'no-cache',
'Referer': 'https://web.vip.miui.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'cross-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
'sec-ch-ua': '"Microsoft Edge";v="119", "Chromium";v="119", "Not?A_Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
}
def get_random_chars_as_string(count: int) -> str:
"""获取随机字符串"""
characters = list('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890!@#,%^&*()-=_+~`{}[]|:<>.?/')
selected_chars = random.sample(characters, count)
return ''.join(selected_chars)
def aes_encrypt(key: str, data: str) -> base64:
"""AES加密"""
iv = b'0102030405060708' # pylint: disable=invalid-name
cipher = Cipher(algorithms.AES(key.encode('utf-8')), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(data.encode('utf-8')) + padder.finalize()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
return base64.b64encode(ciphertext).decode('utf-8')
def rsa_encrypt(public_key_pem: str, data: str) -> base64:
"""RSA加密"""
public_key = serialization.load_pem_public_key(
public_key_pem.encode('utf-8'),
backend=default_backend()
)
encoded_data = base64.b64encode(data.encode('utf-8'))
ciphertext = public_key.encrypt(
encoded_data,
PKCS1v15()
)
return base64.b64encode(ciphertext).decode('utf-8')
IncorrectReturn = (KeyError, TypeError, AttributeError, IndexError, ValidationError)
"""API返回数据无效会触发的异常组合"""
def is_incorrect_return(exception: Exception, *addition_exceptions: Type[Exception]) -> bool:
"""
判断是否是API返回数据无效的异常
:param exception: 异常对象
:param addition_exceptions: 额外的异常类型,用于触发判断
"""
exceptions = IncorrectReturn + addition_exceptions
return isinstance(exception, exceptions) or isinstance(exception.__cause__, exceptions)
async def get_token_by_captcha(url: str) -> str | bool:
"""通过人机验证码获取TOKEN"""
try:
parsed_url = urlparse(url)
query_params = dict(parse_qsl(parsed_url.query)) # 解析URL参数
gt = query_params.get("c") # pylint: disable=invalid-name
challenge = query_params.get("l")
geetest_data = await get_validate(gt, challenge)
params = {
'k': '3dc42a135a8d45118034d1ab68213073',
'locale': 'zh_CN',
'_t': round(time.time() * 1000),
}
data = {
'e': query_params.get("e"), # 人机验证的e参数来自URL
'challenge': geetest_data.challenge,
'seccode': f'{geetest_data.validate}|jordan',
}
response = await post('https://verify.sec.xiaomi.com/captcha/v2/gt/dk/verify', params=params, headers=headers,
data=data)
log.debug(response.text)
result = response.json()
api_data = TokenResultHandler(result)
if api_data.success:
return api_data.token
elif not api_data.data.get("result"):
log.error("遇到人机验证码无法获取TOKEN")
return False
else:
log.error("遇到未知错误无法获取TOKEN")
return False
except Exception: # pylint: disable=broad-exception-caught
log.exception("获取TOKEN异常")
return False
# pylint: disable=trailing-whitespace
async def get_token(uid: str) -> str | bool:
"""获取TOKEN"""
try:
for attempt in Retrying(stop=stop_after_attempt(3)):
with attempt:
data = {
"type": 0,
"startTs": round(time.time() * 1000),
"endTs": round(time.time() * 1000),
"env": {
"p1": "",
"p2": "",
"p3": "",
"p4": "",
"p5": "",
"p6": "",
"p7": "",
"p8": "",
"p9": "",
"p10": "",
"p11": "",
"p12": "",
"p13": "",
"p14": "",
"p15": "",
"p16": "",
"p17": "",
"p18": "",
"p19": "",
"p20": "",
"p21": "",
"p22": "",
"p23": "",
"p24": "",
"p25": "",
"p26": "",
"p28": "",
"p29": "",
"p30": "",
"p31": "",
"p32": "",
"p33": [],
"p34": ""
},
"action": {
"a1": [],
"a2": [],
"a3": [],
"a4": [],
"a5": [],
"a6": [],
"a7": [],
"a8": [],
"a9": [],
"a10": [],
"a11": [],
"a12": [],
"a13": [],
"a14": []
},
"force": False,
"talkBack": False,
"uid": uid,
"nonce": {
"t": round(time.time()),
"r": round(time.time())
},
"version": "2.0",
"scene": "GROW_UP_CHECKIN"
}
key = get_random_chars_as_string(16)
params = {
'k': '3dc42a135a8d45118034d1ab68213073',
'locale': 'zh_CN',
'_t': round(time.time() * 1000),
}
data = {
's': rsa_encrypt(PUBLIC_KEY_PEM, key),
'd': aes_encrypt(key, str(data)),
'a': 'GROW_UP_CHECKIN',
}
response = await post('https://verify.sec.xiaomi.com/captcha/v2/data', params=params, headers=headers,
data=data)
log.debug(response.text)
result = response.json()
api_data = TokenResultHandler(result)
if api_data.success:
return api_data.token
elif api_data.need_verify:
log.error("遇到人机验证码, 尝试调用解决方案")
url = api_data.data.get("url")
if token := await get_token_by_captcha(url):
return token
else:
raise ValueError("人机验证失败")
else:
log.error("遇到未知错误无法获取TOKEN")
return False
except RetryError as error:
if is_incorrect_return(error):
log.exception(f"TOKEN - 服务器没有正确返回 {response.text}")
else:
log.exception("获取TOKEN异常")
return False