chore: captcha的参数拷贝,避免修改原参数 (#172)

* chore: captcha的参数拷贝,避免修改原参数
chore: 提升pylint分数,使代码更加规范

* fix: `@validator` cannot be applied to instance methods

* deleted:    .pylintrc

* chore: no-else-return

* chore: 优化代码,使其更符合PEP 8规范

* chore: 优化代码,使其更符合PEP 8规范

* chore: 优化代码,使其更符合PEP 8规范

* chore: 添加代码规范等级

* fix: fix dockerfile

* modified:   Dockerfile

* chore: 更新docker ci
This commit is contained in:
Night-stars-1 2023-11-18 18:37:51 +08:00 committed by GitHub
parent d0a491e4ef
commit ea361a0e04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 160 additions and 142 deletions

View File

@ -10,7 +10,6 @@ name: DockerHub CI
on:
release:
types: [published]
workflow_dispatch:
env:
DOCKERHUB_REPO: o1si/miui-auto-tasks
@ -19,25 +18,6 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
-
name: Set up python 3.11 run the program once to create the `/data`
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install pipreqs
run: |
pip install pipreqs
- name: Export requirements.txt
run: |
pipreqs . --encoding=utf8 --force
-
name: run the program once to create the `/data`
run: |
python3 miuitask.py
-
name: Set up QEMU
uses: docker/setup-qemu-action@v2

1
.gitignore vendored
View File

@ -143,3 +143,4 @@ logs
data
.pdm-python
test.py
test2.py

5
.pylintrc Normal file
View File

@ -0,0 +1,5 @@
[FORMAT]
max-line-length=200
[MESSAGES CONTROL]
disable=no-else-return, too-few-public-methods

View File

@ -1,17 +1,22 @@
FROM python:3.9-alpine
RUN apk add --no-cache gcc musl-dev python3-dev libffi-dev
RUN pip install pdm
COPY ./utils /srv/utils/
COPY ./requirements.txt /tmp
COPY ./miuitask.py /srv/
VOLUME ["./data", "/srv/data"]
RUN pip install --no-cache-dir -i https://mirrors.bfsu.edu.cn/pypi/web/simple -r /tmp/requirements.txt && \
rm -rf /tmp/* && \
echo "0 4 * * * python /srv/miuitask.py" > /var/spool/cron/crontabs/root
COPY pyproject.toml pdm.lock /srv/
WORKDIR /srv
RUN
RUN pdm install --prod && \
echo "0 4 * * * python /srv/miuitask.py" > /var/spool/cron/crontabs/root
VOLUME ["./data", "/srv/data"]
CMD ["/usr/sbin/crond", "-f"]

View File

@ -1,8 +1,11 @@
# MIUI Task
一个适用于 社区 4.0 模拟网络功能请求的脚本
[![996.icu](https://img.shields.io/badge/link-996.icu-red.svg)](https://996.icu) ![GitHub](https://img.shields.io/github/license/0-8-4/miui-auto-tasks)
![Python](https://img.shields.io/badge/python-3.7+-blue) ![DockerHub](https://github.com/0-8-4/miui-auto-tasks/actions/workflows/docker-image.yml/badge.svg)
[![996.icu](https://img.shields.io/badge/link-996.icu-red.svg)](https://996.icu)
![GitHub](https://img.shields.io/github/license/0-8-4/miui-auto-tasks)
![Python](https://img.shields.io/badge/python-3.7+-blue)
![DockerHub](https://github.com/0-8-4/miui-auto-tasks/actions/workflows/docker-image.yml/badge.svg)
[![CodeFactor](https://www.codefactor.io/repository/github/0-8-4/miui-auto-tasks/badge)](https://www.codefactor.io/repository/github/0-8-4/miui-auto-tasks)
## 我们收到反馈,部分用户已收到通知要求不得继续随意调用社区接口,否则社区账户将被永久封禁。<br/>鉴于以上情况,我们作为项目维护者建议停用脚本。<br/>感谢大家的支持谢谢所有Star和Fork的人。

View File

@ -1,4 +1,4 @@
# new Env("MIUI-Auto-Task")
# new Env("MIUI-Auto-Task") # pylint: disable=missing-module-docstring
# cron 30 8 * * * miuitask.py
import asyncio
@ -15,6 +15,7 @@ _conf = ConfigManager.data_obj
async def main():
"""启动签到"""
print_info()
for account in _conf.accounts:
login_obj = Login(account)

0
utils/api/__init__.py Normal file
View File

View File

@ -4,7 +4,7 @@ LastEditors: Night-stars-1 nujj1042633805@gmail.com
LastEditTime: 2023-11-13 12:32:26
"""
from os import getenv
from typing import Dict, List, Optional, Union
from typing import Dict, Union
import orjson
@ -16,6 +16,8 @@ from .sign import BaseSign
class Login:
"""登录类"""
def __init__(self, account: Account) -> None:
self.account = account
self.user_agent = account.user_agent
@ -24,6 +26,7 @@ class Login:
self.cookies = account.cookies
async def login(self) -> Union[Dict[str, str], bool]:
"""登录小米账号"""
headers = {
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'Referer': 'https://account.xiaomi.com/fe/service/login/password?sid=miui_vip&qs=%253Fcallback%253Dhttp'
@ -72,7 +75,7 @@ class Login:
response = await post('https://account.xiaomi.com/pass/serviceLoginAuth2', headers=headers, data=data)
log.debug(response.text)
result = response.text.lstrip('&').lstrip('START').lstrip('&')
data = orjson.loads(result)
data = orjson.loads(result) # pylint: disable=no-member
api_data = LoginResultHandler(data)
if api_data.success:
log.success('小米账号登录成功')
@ -81,23 +84,22 @@ class Login:
write_plugin_data()
return cookies
elif not api_data.pwd_wrong:
log.error('小米账号登录失败:' + api_data.message)
return False
log.error(f'小米账号登录失败:{api_data.message}')
elif api_data.need_captcha:
log.error('当前账号需要短信验证码, 请尝试修改UA或设备ID')
return False
else:
log.error('小米账号登录失败:用户名或密码不正确')
return False
except Exception:
return False
except Exception: # pylint: disable=broad-exception-caught
log.exception("登录小米账号出错")
return False
async def get_cookie(self, url: str) -> Union[Dict[str, str], bool]:
"""获取社区 Cookie"""
try:
response = await get(url, follow_redirects=False)
log.debug(response.text)
return dict(response.cookies)
except Exception:
except Exception: # pylint: disable=broad-exception-caught
log.exception("社区获取 Cookie 失败")
return False

View File

@ -1,6 +1,8 @@
"""签到实例"""
import time
from typing import Dict, List, Optional, Set, Type, Union
from typing import Dict, List, Optional, Type, Union
from ..data_model import ApiResultHandler, DailyTasksResult, SignResultHandler
from ..request import get, post
@ -33,6 +35,7 @@ class BaseSign:
}
async def check_daily_tasks(self, nolog: bool=False) -> Union[List[DailyTasksResult], List[None]]:
"""获取每日任务状态"""
try:
response = await get('https://api.vip.miui.com/mtop/planet/vip/member/getCheckinPageCakeList',
cookies=self.cookie)
@ -45,14 +48,16 @@ class BaseSign:
for daily_task in task['data']:
task_name = daily_task['title']
task_desc = daily_task.get('desc', '')
showType = True if daily_task['showType'] == 0 else False
task_status.append(DailyTasksResult(name=task_name, showType=showType, desc=task_desc))
show_type = True if daily_task['showType'] == 0 else False # pylint: disable=simplifiable-if-expression
task_status.append(DailyTasksResult(name=task_name, showType=show_type, desc=task_desc))
return task_status
else:
log.error("获取每日任务状态失败:" + api_data.message) if not nolog else None
return []
except Exception:
log.exception("获取每日任务异常") if not nolog else None
if not nolog:
log.error(f"获取每日任务状态失败:{api_data.message}")
return []
except Exception: # pylint: disable=broad-exception-caught
if not nolog:
log.exception("获取每日任务异常")
return []
async def sign(self) -> bool:
@ -86,12 +91,12 @@ class BaseSign:
else:
log.error(f"{self.NAME}失败:" + api_data.message)
return False
except Exception:
except Exception: # pylint: disable=broad-exception-caught
log.exception(f"{self.NAME}出错")
return False
class Check_In(BaseSign):
class CheckIn(BaseSign):
"""
每日签到
"""
@ -111,7 +116,7 @@ class Check_In(BaseSign):
URL_SIGN = 'https://api.vip.miui.com/mtop/planet/vip/user/checkinV2'
class Browse_Post(BaseSign):
class BrowsePost(BaseSign):
"""
浏览帖子超过10秒
"""
@ -130,7 +135,7 @@ class Browse_Post(BaseSign):
URL_SIGN = 'https://api.vip.miui.com/mtop/planet/vip/member/addCommunityGrowUpPointByActionV2'
class Browse_User_Page(BaseSign):
class BrowseUserPage(BaseSign):
"""
浏览个人主页10s
"""
@ -149,7 +154,7 @@ class Browse_User_Page(BaseSign):
URL_SIGN = 'https://api.vip.miui.com/mtop/planet/vip/member/addCommunityGrowUpPointByActionV2'
class Browse_Special_Page(BaseSign):
class BrowseSpecialPage(BaseSign):
"""
浏览指定专题页
"""
@ -168,7 +173,7 @@ class Browse_Special_Page(BaseSign):
URL_SIGN = 'https://api.vip.miui.com/mtop/planet/vip/member/addCommunityGrowUpPointByActionV2'
class Board_Follow(BaseSign):
class BoardFollow(BaseSign):
"""
加入小米圈子
"""
@ -184,7 +189,7 @@ class Board_Follow(BaseSign):
URL_SIGN = 'https://api.vip.miui.com/api/community/board/follow'
class Board_UnFollow(BaseSign):
class BoardUnFollow(BaseSign):
"""
退出小米圈子
"""
@ -200,7 +205,7 @@ class Board_UnFollow(BaseSign):
URL_SIGN = 'https://api.vip.miui.com/api/community/board/unfollow'
class Thumb_Up(BaseSign):
class ThumbUp(BaseSign):
"""
点赞他人帖子
"""
@ -216,10 +221,10 @@ class Thumb_Up(BaseSign):
# 注册签到任务
BaseSign.AVAILABLE_SIGNS[Check_In.NAME] = Check_In
BaseSign.AVAILABLE_SIGNS[Browse_Post.NAME] = Browse_Post
BaseSign.AVAILABLE_SIGNS[Browse_User_Page.NAME] = Browse_User_Page
BaseSign.AVAILABLE_SIGNS[Browse_Special_Page.NAME] = Browse_Special_Page
BaseSign.AVAILABLE_SIGNS[Board_Follow.NAME] = Board_Follow
BaseSign.AVAILABLE_SIGNS[Board_UnFollow.NAME] = Board_UnFollow
BaseSign.AVAILABLE_SIGNS[Thumb_Up.NAME] = Thumb_Up
BaseSign.AVAILABLE_SIGNS[CheckIn.NAME] = CheckIn
BaseSign.AVAILABLE_SIGNS[BrowsePost.NAME] = BrowsePost
BaseSign.AVAILABLE_SIGNS[BrowseUserPage.NAME] = BrowseUserPage
BaseSign.AVAILABLE_SIGNS[BrowseSpecialPage.NAME] = BrowseSpecialPage
BaseSign.AVAILABLE_SIGNS[BoardFollow.NAME] = BoardFollow
BaseSign.AVAILABLE_SIGNS[BoardUnFollow.NAME] = BoardUnFollow
BaseSign.AVAILABLE_SIGNS[ThumbUp.NAME] = ThumbUp

View File

@ -1,22 +1,21 @@
'''
Date: 2023-11-13 19:55:22
LastEditors: Night-stars-1 nujj1042633805@gmail.com
LastEditTime: 2023-11-14 21:30:31
LastEditTime: 2023-11-18 14:30:41
'''
from .request import get, post
from .request import post
from .logger import log
from .config import ConfigManager
from .data_model import ApiResultHandler, GeetestResult
_conf = ConfigManager.data_obj
async def get_validate(gt: str, challenge: str) -> GeetestResult:
async def get_validate(gt: str, challenge: str) -> GeetestResult: # pylint: disable=invalid-name
"""获取人机验证结果"""
try:
validate = ""
if not _conf.preference.geetest_url:
return GeetestResult(challenge="", validate="")
params = _conf.preference.geetest_params
params = _conf.preference.geetest_params.copy()
for key, value in params.items():
if isinstance(value, str):
params[key] = value.format(gt=gt, challenge=challenge)
@ -35,6 +34,6 @@ async def get_validate(gt: str, challenge: str) -> GeetestResult:
challenge = geetest.data["challenge"]
validate = geetest.data["validate"]
return GeetestResult(challenge=challenge, validate=validate)
except Exception:
except Exception: # pylint: disable=broad-exception-caught
log.exception("获取人机验证结果异常")
return GeetestResult(challenge="", validate="")

View File

@ -1,12 +1,14 @@
""""配置文件"""
import os
from hashlib import md5
from json import JSONDecodeError
from pathlib import Path
from typing import Dict, List, Optional, Union
from hashlib import md5
import yaml
from loguru import logger as log
from orjson import JSONDecodeError
from pydantic import BaseModel, ValidationError, validator
from pydantic import (BaseModel, # pylint: disable=no-name-in-module
ValidationError, validator)
ROOT_PATH = Path(__name__).parent.absolute()
@ -18,9 +20,11 @@ CONFIG_PATH = DATA_PATH / "config.yaml" if os.getenv("MIUITASK_CONFIG_PATH") is
def md5_crypto(passwd: str) -> str:
"""MD5加密"""
return md5(passwd.encode('utf8')).hexdigest().upper()
def cookies_to_dict(cookies):
"""将cookies字符串转换为字典"""
cookies_dict = {}
for cookie in cookies.split(';'):
key, value = cookie.strip().split('=', 1) # 分割键和值
@ -28,6 +32,7 @@ def cookies_to_dict(cookies):
return cookies_dict
class Account(BaseModel):
"""账号处理器"""
uid: str = "100000"
"""账户ID 非账户用户名或手机号"""
password: str = ""
@ -37,36 +42,36 @@ class Account(BaseModel):
user_agent: str = 'Mozilla/5.0 (Linux; Android 13) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/116.0.0.0 Safari/537.36'
"""登录社区时所用浏览器的 User-Agent"""
"""功能开关"""
Check_In: bool = False
CheckIn: bool = False
"""社区成长值签到,启用功能意味着你愿意自行承担相关风险"""
Browse_User_Page: bool = False
BrowseUserPage: bool = False
"""社区浏览个人主页10秒启用功能意味着你愿意自行承担相关风险"""
Browse_Post: bool = False
BrowsePost: bool = False
"""社区浏览帖子10秒启用功能意味着你愿意自行承担相关风险"""
Thumb_Up: bool = False
ThumbUp: bool = False
"""点赞帖子,启用功能意味着你愿意自行承担相关风险"""
Browse_Special_Page: bool = False
BrowseSpecialPage: bool = False
"""社区在活动期间可能会出现限时的“浏览指定专题页”任务,启用功能意味着你愿意自行承担相关风险"""
Board_Follow: bool = False
BoardFollow: bool = False
"""社区可能会出现限时的“加入圈子”任务,启用功能意味着你愿意自行承担相关风险"""
carrot_pull: bool = False
carrotpull: bool = False
"""社区拔萝卜,启用功能意味着你愿意自行承担相关风险"""
@validator("password", allow_reuse=True)
def _password(cls, v: Optional[str]):
if len(v) == 32:
return v
return md5_crypto(v)
def _password(cls, value: Optional[str]): # pylint: disable=no-self-argument
if len(value) == 32:
return value
return md5_crypto(value)
@validator("cookies", allow_reuse=True)
def _cookies(cls, v: Union[dict, str]):
if type(v) == str:
return cookies_to_dict(v)
return v
def _cookies(cls, value: Union[dict, str]): # pylint: disable=no-self-argument
if isinstance(value, str):
return cookies_to_dict(value)
return value
class OnePush(BaseModel):
"""推送配置"""
notifier: Union[str, bool] = ""
"""是否开启消息推送"""
params: Dict = {
@ -78,6 +83,7 @@ class OnePush(BaseModel):
"""推送参数"""
class Preference(BaseModel):
"""偏好设置"""
geetest_url: str = ""
"""极验验证URL"""
geetest_params: Dict = {}
@ -87,6 +93,7 @@ class Preference(BaseModel):
class Config(BaseModel):
"""插件数据"""
preference: Preference = Preference()
"""偏好设置"""
accounts: List[Account] = [Account()]
@ -110,14 +117,15 @@ def write_plugin_data(data: Config = None):
except (AttributeError, TypeError, ValueError):
log.exception("数据对象序列化失败,可能是数据类型错误")
return False
with open(CONFIG_PATH, "w") as f:
f.write(str_data)
with open(CONFIG_PATH, "w", encoding="utf-8") as file:
file.write(str_data)
return True
except OSError:
return False
class ConfigManager:
"""配置管理器"""
data_obj = Config()
"""加载出的插件数据对象"""
platform = "pc"
@ -130,11 +138,12 @@ class ConfigManager:
"""
if os.path.exists(DATA_PATH) and os.path.isfile(CONFIG_PATH):
try:
with open(CONFIG_PATH, 'r') as file:
with open(CONFIG_PATH, 'r', encoding="utf-8") as file:
data = yaml.safe_load(file)
new_model = Config.model_validate(data)
for attr in new_model.model_fields:
ConfigManager.data_obj.__setattr__(attr, new_model.__getattribute__(attr))
#ConfigManager.data_obj.__setattr__(attr, new_model.__getattribute__(attr))
setattr(ConfigManager.data_obj, attr, getattr(new_model, attr))
write_plugin_data(ConfigManager.data_obj) # 同步配置
except (ValidationError, JSONDecodeError):
log.exception(f"读取数据文件失败,请检查数据文件 {CONFIG_PATH} 格式是否正确")

View File

@ -1,5 +1,6 @@
"""数据处理模型"""
from typing import (Any, Dict, NamedTuple, Optional)
from pydantic import BaseModel
from pydantic import BaseModel #pylint: disable=no-name-in-module
class ApiResultHandler(BaseModel):
@ -92,14 +93,12 @@ class SignResultHandler(ApiResultHandler):
def __init__(self, content: Dict[str, Any]):
super().__init__(content=content)
self.growth = self.content.get("entity", {})
if type(self.growth) == dict:
if isinstance(self.growth, dict):
self.growth = self.growth.get("score", "未知")
elif type(self.growth) == int:
elif isinstance(self.growth, int):
self.growth = str(self.growth)
# pylint: disable=trailing-whitespace
def __bool__(self):
"""
签到是否成功
@ -128,15 +127,14 @@ class TokenResultHandler(ApiResultHandler):
@property
def need_verify(self):
"""需要验证码"""
return self.data.get("result") == False and self.data.get("url")
return self.data.get("result") is False and self.data.get("url")
@property
def success(self):
"""是否成功获取TOKEN"""
return self.token != ""
class GeetestResult(NamedTuple):
"""人机验证结果数据"""
validate: str
challenge: str

View File

@ -1,20 +1,21 @@
"""
Date: 2023-11-11 23:39:10
'''
Date: 2023-11-12 14:05:06
LastEditors: Night-stars-1 nujj1042633805@gmail.com
LastEditTime: 2023-11-12 13:58:56
"""
LastEditTime: 2023-11-18 14:20:44
'''
import os
import sys
from loguru import logger
message = ""
MESSAGE = ""
def LogFilter(record):
global message
def log_filter(record: dict):
"""loguru过滤器"""
global MESSAGE # pylint: disable=global-statement
if record["level"].no >= 20:
message += f"{record.get('message')}\n"
MESSAGE += f"{record.get('message')}\n"
return True
@ -25,8 +26,8 @@ def get_message():
返回:
收集到的消息
"""
global message
return message
global MESSAGE # pylint: disable=global-variable-not-assigned
return MESSAGE
path_log = os.path.join("logs", '日志文件.log')
@ -36,7 +37,7 @@ log.remove()
log.add(sys.stdout, level="INFO", colorize=True,
format="<cyan>{module}</cyan>.<cyan>{function}</cyan>"
":<cyan>{line}</cyan> - "
"<level>{message}</level>", filter=LogFilter)
"<level>{message}</level>", filter=log_filter)
log.add(path_log, level="DEBUG",
format="{time:HH:mm:ss} - "

View File

@ -1,16 +1,15 @@
"""
Date: 2023-11-11 23:34:08
'''
Date: 2023-11-12 14:05:06
LastEditors: Night-stars-1 nujj1042633805@gmail.com
LastEditTime: 2023-11-13 18:17:50
"""
import httpx
LastEditTime: 2023-11-18 00:32:53
'''
from typing import Any, Dict, Optional
import httpx
from onepush import notify
from .logger import log
from .config import ConfigManager
from .logger import log
_conf = ConfigManager.data_obj
@ -73,5 +72,5 @@ def notify_me(content=""):
params = _conf.ONEPUSH.params
if not notifier or not params:
log.error('未配置推送或未正确配置推送')
return
return False
return notify(notifier, content=content, **params)

View File

@ -1,7 +1,7 @@
'''
Date: 2023-11-13 20:29:19
LastEditors: Night-stars-1 nujj1042633805@gmail.com
LastEditTime: 2023-11-14 21:13:35
LastEditTime: 2023-11-18 14:22:37
'''
import platform
from urllib.request import getproxies
@ -9,7 +9,8 @@ from utils.logger import log
def print_info():
log.info("MIUI-AUTO-TASK v1.7.1.1")
"""打印系统信息"""
log.info("MIUI-AUTO-TASK v1.7.1")
log.info('---------- 系统信息 -------------')
system_info()
log.info('---------- 项目信息 -------------')
@ -20,6 +21,7 @@ def print_info():
def system_info():
"""系统信息"""
log.info(show_info('操作系统平台', platform.platform()))
log.info(show_info('操作系统版本', platform.version()))
log.info(show_info('操作系统名称', platform.system()))
@ -31,5 +33,6 @@ def system_info():
log.info(show_info('系统代理', getproxies()))
def show_info(tip, info):
return "{}: {}".format(tip, info)
def show_info(tip: str, info: str):
"""格式化输出"""
return f"{tip}: {info}"

View File

@ -1,21 +1,23 @@
"""工具类"""
import base64
import random
import time
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives import padding, serialization
from cryptography.hazmat.backends import default_backend
import base64
from urllib.parse import urlparse, parse_qsl
from pydantic import ValidationError
from typing import Type
from urllib.parse import parse_qsl, urlparse
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding, serialization
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from pydantic import ValidationError
from tenacity import RetryError, Retrying, stop_after_attempt
from .request import get, post
from .captcha import get_validate
from .data_model import TokenResultHandler
from .logger import log
from .captcha import get_validate
from .request import post
public_key_pem = '''-----BEGIN PUBLIC KEY-----
PUBLIC_KEY_PEM = '''-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEArxfNLkuAQ/BYHzkzVwtu
g+0abmYRBVCEScSzGxJIOsfxVzcuqaKO87H2o2wBcacD3bRHhMjTkhSEqxPjQ/FE
XuJ1cdbmr3+b3EQR6wf/cYcMx2468/QyVoQ7BADLSPecQhtgGOllkC+cLYN6Md34
@ -44,12 +46,14 @@ headers = {
}
def get_random_chars_as_string(count: int) -> str:
"""获取随机字符串"""
characters = list('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890!@#,%^&*()-=_+~`{}[]|:<>.?/')
selected_chars = random.sample(characters, count)
return ''.join(selected_chars)
def aes_encrypt(key, data) -> base64:
iv = b'0102030405060708'
def aes_encrypt(key: str, data: str) -> base64:
"""AES加密"""
iv = b'0102030405060708' # pylint: disable=invalid-name
cipher = Cipher(algorithms.AES(key.encode('utf-8')), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
padder = padding.PKCS7(algorithms.AES.block_size).padder()
@ -57,7 +61,8 @@ def aes_encrypt(key, data) -> base64:
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
return base64.b64encode(ciphertext).decode('utf-8')
def rsa_encrypt(public_key_pem, data: str) -> base64:
def rsa_encrypt(public_key_pem: str, data: str) -> base64:
"""RSA加密"""
public_key = serialization.load_pem_public_key(
public_key_pem.encode('utf-8'),
backend=default_backend()
@ -82,10 +87,11 @@ def is_incorrect_return(exception: Exception, *addition_exceptions: Type[Excepti
return isinstance(exception, exceptions) or isinstance(exception.__cause__, exceptions)
async def get_token_by_captcha(url: str) -> str:
"""通过人机验证码获取TOKEN"""
try:
parsed_url = urlparse(url)
query_params = dict(parse_qsl(parsed_url.query)) # 解析URL参数
gt = query_params.get("c")
gt = query_params.get("c") # pylint: disable=invalid-name
challenge = query_params.get("l")
geetest_data = await get_validate(gt, challenge)
params = {
@ -112,11 +118,12 @@ async def get_token_by_captcha(url: str) -> str:
else:
log.error("遇到未知错误无法获取TOKEN")
return False
except Exception:
except Exception: # pylint: disable=broad-exception-caught
log.exception("获取TOKEN异常")
return False
# pylint: disable=trailing-whitespace
async def get_token(uid: str) -> str:
"""获取TOKEN"""
try:
for attempt in Retrying(stop=stop_after_attempt(3)):
with attempt:
@ -196,7 +203,7 @@ async def get_token(uid: str) -> str:
data = {
's': rsa_encrypt(public_key_pem, key),
's': rsa_encrypt(PUBLIC_KEY_PEM, key),
'd': aes_encrypt(key, str(data)),
'a': 'GROW_UP_CHECKIN',
}
@ -215,8 +222,8 @@ async def get_token(uid: str) -> str:
else:
log.error("遇到未知错误无法获取TOKEN")
return False
except RetryError as e:
if is_incorrect_return(e):
except RetryError as error:
if is_incorrect_return(error):
log.exception(f"TOKEN - 服务器没有正确返回 {response.text}")
else:
log.exception("获取TOKEN异常")