Add verification pass plugin

 添加验证通过插件

Co-authored-by: xtaodada <xtao@xtaolink.cn>
This commit is contained in:
洛水居室 2022-11-12 20:59:42 +08:00 committed by GitHub
parent 97257141b2
commit 38541428b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 296 additions and 38 deletions

View File

@ -1,8 +1,11 @@
import hashlib
import json
import random
import string
import time
import uuid
from typing import Mapping
from urllib.parse import urlencode
RECOGNIZE_SERVER = {
"1": "cn_gf01",
@ -15,14 +18,8 @@ RECOGNIZE_SERVER = {
}
def get_device_id(name: str) -> str:
return str(uuid.uuid3(uuid.NAMESPACE_URL, name)).replace("-", "").upper()
def md5(text: str) -> str:
_md5 = hashlib.md5() # nosec B303
_md5.update(text.encode())
return _md5.hexdigest()
def get_device_id(name: str = None):
return str(uuid.uuid3(uuid.NAMESPACE_URL, name))
def random_text(num: int) -> str:
@ -33,18 +30,52 @@ def timestamp() -> int:
return int(time.time())
def get_ds(salt: str = "", web: int = 1) -> str:
if salt == "":
if web == 1:
salt = "h8w582wxwgqvahcdkpvdhbh2w9casgfl"
elif web == 2:
salt = "h8w582wxwgqvahcdkpvdhbh2w9casgfl"
elif web == 3:
salt = "fd3ykrh7o1j54g581upo1tvpam0dsgtf"
i = str(timestamp())
r = random_text(6)
c = md5("salt=" + salt + "&t=" + i + "&r=" + r)
return f"{i},{r},{c}"
def _hexdigest(text):
_md5 = hashlib.md5() # nosec B303
_md5.update(text.encode())
return _md5.hexdigest()
def get_ds(ds_type: str = None, new_ds: bool = False, data: Mapping[str, str] = None, params: Mapping[str, str] = None):
# 1: ios
# 2: android
# 4: pc web
# 5: mobile web
def new():
t = str(int(time.time()))
r = str(random.randint(100001, 200000)) # nosec
b = json.dumps(data) if data else ''
q = urlencode(params) if params else ''
c = _hexdigest(f'salt={salt}&t={t}&r={r}&b={b}&q={q}')
return f'{t},{r},{c}'
def old():
t = str(int(time.time()))
r = ''.join(random.sample(string.ascii_lowercase + string.digits, 6))
c = _hexdigest(f'salt={salt}&t={t}&r={r}')
return f'{t},{r},{c}'
app_version = '2.36.1'
client_type = '5'
salt = 'YVEIkzDFNHLeKXLxzqCA9TzxCpWwbIbk'
ds = old()
if ds_type in ('android', '2'):
app_version = '2.36.1'
client_type = '2'
salt = 'n0KjuIrKgLHh08LWSCYP0WXlVXaYvV64'
ds = old()
if ds_type == 'android_new':
app_version = '2.36.1'
client_type = '2'
salt = 't0qEgfub6cvueAPgR5m9aQWWVciEer7v'
ds = new()
if new_ds:
app_version = '2.36.1'
client_type = '5'
salt = 'xV8v4Qu54lUKrEYFZkJhB8cuOh9Asafs'
ds = new()
return app_version, client_type, ds
def get_recognize_server(uid: int) -> str:

View File

@ -1,4 +1,5 @@
import asyncio
import json
import re
import time
from datetime import datetime
@ -12,9 +13,8 @@ from httpx import AsyncClient
from pydantic import BaseModel, validator
from modules.apihelper.base import ArtworkImage, PostInfo
from modules.apihelper.helpers import get_device_id
from modules.apihelper.helpers import get_device_id, get_ds
from modules.apihelper.request.hoyorequest import HOYORequest
from utils.log import logger
from utils.typedefs import JSONDict
@ -126,7 +126,7 @@ class Hyperion:
async def download_image(self, art_id: int, url: str, page: int = 0) -> ArtworkImage:
response = await self.client.get(url, params=self.get_images_params(resize=2000), timeout=10, de_json=False)
return ArtworkImage(art_id=art_id, page=page, data=response)
return ArtworkImage(art_id=art_id, page=page, data=response.content)
async def get_new_list(self, gids: int, type_id: int, page_size: int = 20):
"""
@ -353,7 +353,92 @@ class SignIn:
)
return data.get("authkey")
except JSONDecodeError:
logger.warning("Stoken 获取 Authkey JSON解析失败")
pass
except InvalidCookies:
logger.warning("Stoken 获取 Authkey 失败 | 用户 Stoken 失效")
pass
return None
class Verification:
HOST = "api-takumi-record.mihoyo.com"
VERIFICATION_HOST = "api.geetest.com"
CREATE_VERIFICATION_URL = "/game_record/app/card/wapi/createVerification"
VERIFY_VERIFICATION_URL = "/game_record/app/card/wapi/verifyVerification"
AJAX_URL = "/ajax.php"
USER_AGENT = (
"User-Agent: Mozilla/5.0 (Linux; Android 12; Mi 10 Build/SKQ1.211006.001; wv) "
"AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/103.0.5060.129 Mobile Safari/537.36 "
"miHoYoBBS/2.33.1"
)
BBS_HEADERS = {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"User-Agent": USER_AGENT,
"X-Requested-With": "com.mihoyo.hyperion",
"Referer": "https://webstatic.mihoyo.com/",
"x-rpc-device_id": get_device_id(USER_AGENT),
"x-rpc-page": "3.1.3_#/ys",
}
VERIFICATION_HEADERS = {
"Accept": "*/*",
"X-Requested-With": "com.mihoyo.hyperion",
"User-Agent": USER_AGENT,
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
}
def __init__(self, cookie: Dict = None):
self.client = HOYORequest(headers=self.BBS_HEADERS, cookies=cookie)
def get_verification_headers(self, referer: str):
headers = self.VERIFICATION_HEADERS.copy()
headers["Referer"] = referer
return headers
def get_headers(self, data: dict = None, params: dict = None):
headers = self.BBS_HEADERS.copy()
app_version, client_type, ds = get_ds(new_ds=True, data=data, params=params)
headers["x-rpc-app_version"] = app_version
headers["x-rpc-client_type"] = client_type
headers["DS"] = ds
return headers
@staticmethod
def get_url(host: str, url: str):
return f"https://{host}{url}"
async def create(self):
url = self.get_url(self.HOST, self.CREATE_VERIFICATION_URL)
params = {"is_high": "true"}
headers = self.get_headers(params=params)
response = await self.client.get(url, params=params, headers=headers)
return response
async def verify(self, challenge: str, validate: str):
url = self.get_url(self.HOST, self.VERIFY_VERIFICATION_URL)
data = {"geetest_challenge": challenge, "geetest_validate": validate, "geetest_seccode": f"{validate}|jordan"}
headers = self.get_headers(data=data)
response = await self.client.post(url, json=data, headers=headers)
return response
async def ajax(self, referer: str, gt: str, challenge: str) -> Optional[str]:
headers = self.get_verification_headers(referer)
url = self.get_url(self.VERIFICATION_HOST, self.AJAX_URL)
params = {
"gt": gt,
"challenge": challenge,
"lang": "zh-cn",
"pt": 3,
"client_type": "web_mobile",
"callback": f"geetest_{int(time.time() * 1000)}",
}
response = await self.client.get(url, headers=headers, params=params, de_json=False)
text = response.text
json_data = re.findall(r"^.*?\((\{.*?)\)$", text)[0]
data = json.loads(json_data)
if "success" in data["status"] and "success" in data["data"]["result"]:
return data["data"]["validate"]
return None

View File

@ -1,6 +1,7 @@
from typing import Union
import httpx
from httpx import Response
from modules.apihelper.error import NetworkException, ResponseException, APIHelperTimedOut
from modules.apihelper.request.httpxrequest import HTTPXRequest
@ -10,7 +11,7 @@ from modules.apihelper.typedefs import POST_DATA, JSON_DATA
class HOYORequest(HTTPXRequest):
async def get(
self, url: str, *args, de_json: bool = True, re_json_data: bool = False, **kwargs
) -> Union[POST_DATA, JSON_DATA, bytes]:
) -> Union[POST_DATA, JSON_DATA, Response]:
try:
response = await self._client.get(url=url, *args, **kwargs)
except httpx.TimeoutException as err:
@ -20,7 +21,35 @@ class HOYORequest(HTTPXRequest):
if response.is_error:
raise ResponseException(message=f"response error in status code: {response.status_code}")
if not de_json:
return response.content
return response
json_data = response.json()
return_code = json_data.get("retcode", None)
data = json_data.get("data", None)
message = json_data.get("message", None)
if return_code is None:
return json_data
if return_code != 0:
if message is None:
raise ResponseException(message=f"response error in return code: {return_code}")
else:
raise ResponseException(response=json_data)
if not re_json_data and data is not None:
return data
return json_data
async def post(
self, url: str, *args, de_json: bool = True, re_json_data: bool = False, **kwargs
) -> Union[POST_DATA, JSON_DATA, Response]:
try:
response = await self._client.post(url=url, *args, **kwargs)
except httpx.TimeoutException as err:
raise APIHelperTimedOut from err
except httpx.HTTPError as exc:
raise NetworkException(f"Unknown error in HTTP implementation: {repr(exc)}") from exc
if response.is_error:
raise ResponseException(message=f"response error in status code: {response.status_code}")
if not de_json:
return response
json_data = response.json()
return_code = json_data.get("retcode", None)
data = json_data.get("data", None)

View File

@ -0,0 +1,74 @@
from typing import Tuple, Optional
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import CallbackContext
from core.base.redisdb import RedisDB
from core.baseplugin import BasePlugin
from core.config import config
from core.cookies import CookiesService
from core.plugin import Plugin, handler
from core.user import UserService
from modules.apihelper.hyperion import Verification
from utils.decorators.error import error_callable
from utils.decorators.restricts import restricts
from utils.models.base import RegionEnum
class VerificationSystem:
def __init__(self, redis: RedisDB = None):
self.cache = redis.client
self.qname = "plugin:verification:"
async def get_challenge(self, uid: int) -> Tuple[Optional[str], Optional[str]]:
data = await self.cache.get(f"{self.qname}{uid}")
if not data:
return None, None
data = data.decode("utf-8").split("|")
return data[0], data[1]
async def set_challenge(self, uid: int, gt: str, challenge: str):
await self.cache.set(f"{self.qname}{uid}", f"{gt}|{challenge}")
await self.cache.expire(f"{self.qname}{uid}", 10 * 60)
class VerificationPlugins(Plugin, BasePlugin):
def __init__(self, user_service: UserService = None, cookies_service: CookiesService = None, redis: RedisDB = None):
self.cookies_service = cookies_service
self.user_service = user_service
self.system = VerificationSystem(redis)
@handler.command("verify", block=False)
@restricts(restricts_time=60)
@error_callable
async def verify(self, update: Update, context: CallbackContext) -> None:
user = update.effective_user
message = update.effective_message
user_info = await self.user_service.get_user_by_id(user.id)
if user_info.region != RegionEnum.HYPERION:
await message.reply_text("非法用户")
return
uid = user_info.yuanshen_uid
cookie = await self.cookies_service.get_cookies(user.id, RegionEnum.HYPERION)
client = Verification(cookie=cookie.cookies)
if context.args and len(context.args) > 0:
validate = context.args[0]
_, challenge = await self.system.get_challenge(uid)
if challenge:
await client.verify(challenge, validate)
await message.reply_text("验证成功")
else:
await message.reply_text("验证失效")
return
data = await client.create()
challenge = data["challenge"]
gt = data["gt"]
validate = await client.ajax(referer="https://webstatic.mihoyo.com/", gt=gt, challenge=challenge)
if validate:
await client.verify(challenge, validate)
await message.reply_text("验证成功")
return
await self.system.set_challenge(uid, gt, challenge)
url = f"{config.pass_challenge_user_web}?username={context.bot.username}&command=verify&gt={gt}&challenge={challenge}&uid={uid}"
button = InlineKeyboardMarkup([[InlineKeyboardButton("验证", url=url)]])
await message.reply_text("请尽快点击下方手动验证", reply_markup=button)

View File

@ -4,18 +4,26 @@ from telegram.ext import CallbackContext, CommandHandler
from telegram.helpers import escape_markdown
from core.base.redisdb import RedisDB
from core.cookies import CookiesService
from core.cookies.error import CookiesNotFoundError
from core.plugin import handler, Plugin
from core.user import UserService
from core.user.error import UserNotFoundError
from modules.apihelper.hyperion import Verification
from plugins.genshin.sign import SignSystem, NeedChallenge
from plugins.genshin.verification import VerificationSystem
from utils.decorators.restricts import restricts
from utils.helpers import get_genshin_client
from utils.log import logger
from utils.models.base import RegionEnum
class StartPlugin(Plugin):
def __init__(self, redis: RedisDB = None):
def __init__(self, user_service: UserService = None, cookies_service: CookiesService = None, redis: RedisDB = None):
self.cookies_service = cookies_service
self.user_service = user_service
self.sign_system = SignSystem(redis)
self.verification_system = VerificationSystem(redis)
@handler(CommandHandler, command="start", block=False)
@restricts()
@ -39,10 +47,21 @@ class StartPlugin(Plugin):
f"你好 {user.mention_markdown_v2()} {escape_markdown('!我是派蒙 ')}\n"
f"{escape_markdown('发送 /setuid 或 /setcookie 命令进入绑定账号流程')}"
)
elif args[0] == "verify_verification":
await message.reply_markdown_v2(
f"你好 {user.mention_markdown_v2()} {escape_markdown('!我是派蒙 ')}\n"
f"{escape_markdown('发送 /verify 命令进入认证流程')}"
)
elif args[0] == "sign":
await self.gen_sign_button(message, user)
elif args[0].startswith("challenge_"):
await self.process_sign_validate(message, user, args[0][10:])
_data = args[0].split("_")
_command = _data[1]
_challenge = _data[2]
if _command == "sign":
await self.process_sign_validate(message, user, _challenge)
elif _command == "verify":
await self.process_validate(message, user, _challenge)
else:
await message.reply_html(f"你好 {user.mention_html()} !我是派蒙 \n请点击 /{args[0]} 命令进入对应流程")
return
@ -78,7 +97,7 @@ class StartPlugin(Plugin):
return
await message.reply_text("请尽快点击下方按钮进行验证。", allow_sending_without_reply=True, reply_markup=button)
except (UserNotFoundError, CookiesNotFoundError):
logger.warning(f"用户 {user.full_name}[{user.id}] 账号信息未找到")
logger.warning("用户 %s[%s] 账号信息未找到", user.full_name, user.id)
async def process_sign_validate(self, message: Message, user: User, validate: str):
try:
@ -91,6 +110,21 @@ class StartPlugin(Plugin):
sign_text = await self.sign_system.start_sign(client, headers=headers)
await message.reply_text(sign_text, allow_sending_without_reply=True)
except (UserNotFoundError, CookiesNotFoundError):
logger.warning(f"用户 {user.full_name}[{user.id}] 账号信息未找到")
logger.warning("用户 %s[%s] 账号信息未找到", user.full_name, user.id)
except NeedChallenge:
await message.reply_text("回调错误,请重新签到", allow_sending_without_reply=True)
async def process_validate(self, message: Message, user: User, validate: str):
user_info = await self.user_service.get_user_by_id(user.id)
if user_info.region != RegionEnum.HYPERION:
await message.reply_text("非法用户")
return
uid = user_info.yuanshen_uid
cookie = await self.cookies_service.get_cookies(user.id, RegionEnum.HYPERION)
client = Verification(cookie=cookie.cookies)
_, challenge = await self.verification_system.get_challenge(uid)
if challenge:
await client.verify(challenge, validate)
await message.reply_text("验证成功")
else:
await message.reply_text("验证失效")

View File

@ -17,9 +17,16 @@ from utils.log import logger
async def send_user_notification(update: Update, context: CallbackContext, text: str):
if update.inline_query is not None: # 忽略 inline_query
return
buttons = InlineKeyboardMarkup(
[[InlineKeyboardButton("点我重新绑定", url=f"https://t.me/{context.bot.username}?start=set_cookie")]]
)
if "重新绑定" in text:
buttons = InlineKeyboardMarkup(
[[InlineKeyboardButton("点我重新绑定", url=f"https://t.me/{context.bot.username}?start=set_cookie")]]
)
elif "通过验证" in text:
buttons = InlineKeyboardMarkup(
[[InlineKeyboardButton("点我通过验证", url=f"https://t.me/{context.bot.username}?start=verify_verification")]]
)
else:
buttons = ReplyKeyboardRemove()
user = update.effective_user
message = update.effective_message
chat = update.effective_chat
@ -29,9 +36,7 @@ async def send_user_notification(update: Update, context: CallbackContext, text:
return
logger.info(f"尝试通知用户 {user.full_name}[{user.id}] " f"{chat.full_name}[{chat.id}]" f"的 错误信息[{text}]")
try:
await message.reply_text(
text, reply_markup=buttons if "重新绑定" in text else ReplyKeyboardRemove(), allow_sending_without_reply=True
)
await message.reply_text(text, reply_markup=buttons, allow_sending_without_reply=True)
except (BadRequest, Forbidden, Exception) as exc:
logger.error(f"发送 update_id[{update.update_id}] 错误信息失败 错误信息为")
logger.exception(exc)
@ -105,7 +110,7 @@ def error_callable(func: Callable) -> Callable:
if exc.retcode == -130:
await send_user_notification(update, context, "出错了呜呜呜 ~ 未设置默认角色,请尝试重新绑定")
elif exc.retcode == 1034:
await send_user_notification(update, context, "出错了呜呜呜 ~ 服务器检测到该账号可能存在异常,请求被拒绝")
await send_user_notification(update, context, "出错了呜呜呜 ~ 服务器检测到该账号可能存在异常,请求被拒绝,请尝试通过验证")
else:
logger.error("GenshinException")
logger.exception(exc)