♻ 重写 apihelper 模块

This commit is contained in:
洛水居室 2022-10-08 08:59:08 +08:00 committed by GitHub
parent 0fe61d5f44
commit 29a5508174
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 386 additions and 539 deletions

View File

@ -16,9 +16,7 @@ class GameStrategyService:
async def _get_strategy_from_hyperion(self, collection_id: int, character_name: str) -> int:
post_id: int = -1
post_full_in_collection = await self._hyperion.get_post_full_in_collection(collection_id)
if post_full_in_collection.error:
return post_id
for post_data in post_full_in_collection.data["posts"]:
for post_data in post_full_in_collection["posts"]:
topics = post_data["topics"]
for topic in topics:
if character_name == topic["name"]:
@ -40,9 +38,9 @@ class GameStrategyService:
else:
return ""
artwork_info = await self._hyperion.get_artwork_info(2, post_id)
await self._cache.set_url_list(character_name, artwork_info.results.image_url_list)
return artwork_info.results.image_url_list[0]
artwork_info = await self._hyperion.get_post_info(2, post_id)
await self._cache.set_url_list(character_name, artwork_info.image_urls)
return artwork_info.image_urls[0]
class GameMaterialService:
@ -55,9 +53,7 @@ class GameMaterialService:
async def _get_material_from_hyperion(self, collection_id: int, character_name: str) -> int:
post_id: int = -1
post_full_in_collection = await self._hyperion.get_post_full_in_collection(collection_id)
if post_full_in_collection.error:
return post_id
for post_data in post_full_in_collection.data["posts"]:
for post_data in post_full_in_collection["posts"]:
topics = post_data["topics"]
for topic in topics:
if character_name == topic["name"]:
@ -84,9 +80,9 @@ class GameMaterialService:
else:
return ""
artwork_info = await self._hyperion.get_artwork_info(2, post_id)
await self._cache.set_url_list(character_name, artwork_info.results.image_url_list)
image_url_list = artwork_info.results.image_url_list
artwork_info = await self._hyperion.get_post_info(2, post_id)
await self._cache.set_url_list(character_name, artwork_info.image_urls)
image_url_list = artwork_info.image_urls
if len(image_url_list) == 0:
return ""
elif len(image_url_list) == 1:

View File

@ -50,7 +50,8 @@ class ArtifactOcrRate:
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
'sec-gpc': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.115 Safari/537.36',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/104.0.5112.115 Safari/537.36',
}
def __init__(self):

View File

@ -1,158 +1,50 @@
import imghdr
from enum import Enum
from typing import List, Any
from pydantic import BaseModel, PrivateAttr
class ArtworkImage:
class ArtworkImage(BaseModel):
art_id: int
page: int = 0
data: bytes = b""
is_error: bool = False
def __init__(self, art_id: int, page: int = 0, is_error: bool = False, data: bytes = b""):
"""
:param art_id: 插画ID
:param page: 页数
:param is_error: 插画是否有问题
:param data: 插画数据
"""
self.art_id = art_id
self.data = data
self.is_error = is_error
if not is_error:
self.format: str = imghdr.what(None, self.data)
self.page = page
class BaseResponseData:
def __init__(self, response=None, error_message: str = ""):
"""
:param response: 相应
:param error_message: 错误信息
"""
if response is None:
self.error: bool = True
self.message: str = error_message
return
self.response: dict = response
self.code = response["retcode"]
if self.code == 0:
self.error = False
@property
def format(self) -> str:
if self.is_error:
return ""
else:
self.error = True
self.message = response["message"]
self.data = response["data"]
imghdr.what(None, self.data)
class Stat:
def __init__(self, view_num: int = 0, reply_num: int = 0, like_num: int = 0, bookmark_num: int = 0,
forward_num: int = 0):
self.forward_num = forward_num # 关注数
self.bookmark_num = bookmark_num # 收藏数
self.like_num = like_num # 喜欢数
self.reply_num = reply_num # 回复数
self.view_num = view_num # 观看数
class PostInfo(BaseModel):
_data: dict = PrivateAttr()
post_id: int
user_uid: int
subject: str
image_urls: List[str]
created_at: int
def __init__(self, _data: dict, **data: Any):
super().__init__(**data)
self._data = _data
class ArtworkInfo:
def __init__(self, post_id: int = 0, subject: str = "", tags=None,
image_url_list=None, stat: Stat = None, uid: int = 0, created_at: int = 0):
"""
:param post_id: post_id
:param subject: 标题
:param tags: 标签
:param image_url_list: 图片URL列表
:param stat: 统计
:param uid: 用户UID
:param created_at: 创建时间
"""
if tags is None:
self.tags = []
else:
self.tags = tags
if image_url_list is None:
self.image_url_list = []
else:
self.image_url_list = image_url_list
self.Stat = stat
self.created_at = created_at
self.uid = uid
self.subject = subject
self.post_id = post_id
class HyperionResponse:
"""
:param response: 相应
:param error_message: 错误信息
"""
def __init__(self, response=None, error_message: str = ""):
if response is None:
self.error: bool = True
self.message: str = error_message
return
self.response: dict = response
self.code = response["retcode"]
if self.code == 0:
self.error = False
else:
if self.code == 1102:
self.message = "作品不存在"
self.error = True
return
if response["data"] is None:
self.error = True
self.message: str = response["message"]
if self.error:
return
try:
self._data_post = response["data"]["post"]
post = self._data_post["post"] # 投稿信息
post_id = post["post_id"]
subject = post["subject"] # 介绍类似title标题
created_at = post["created_at"] # 创建时间
user = self._data_post["user"] # 用户数据
uid = user["uid"] # 用户ID
topics = self._data_post["topics"] # 存放 Tag
image_list = self._data_post["image_list"] # image_list
except (AttributeError, TypeError) as err:
self.error: bool = True
self.message: str = err
return
topics_list = []
image_url_list = []
for topic in topics:
topics_list.append(topic["name"])
@classmethod
def paste_data(cls, data: dict) -> "PostInfo":
image_urls = []
_data_post = data["post"]
post = _data_post["post"]
post_id = post["post_id"]
subject = post["subject"]
image_list = _data_post["image_list"]
for image in image_list:
image_url_list.append(image["url"])
self.post_id = post["post_id"]
self.user_id = user["uid"]
self.created_at = post["created_at"]
stat = Stat(view_num=self._data_post["stat"]["view_num"],
reply_num=self._data_post["stat"]["reply_num"],
like_num=self._data_post["stat"]["like_num"],
bookmark_num=self._data_post["stat"]["bookmark_num"],
forward_num=self._data_post["stat"]["forward_num"],
)
self.results = ArtworkInfo(
subject=subject,
created_at=created_at,
uid=uid,
stat=stat,
tags=topics_list,
post_id=post_id,
image_url_list=image_url_list
)
image_urls.append(image["url"])
created_at = post["created_at"]
user = _data_post["user"] # 用户数据
user_uid = user["uid"] # 用户ID
return PostInfo(_data=data, post_id=post_id, user_uid=user_uid, subject=subject, image_urls=image_urls,
created_at=created_at)
def __bool__(self):
"""
:return: 是否错误
"""
return self.error
def __len__(self):
"""
:return: 插画连接数量
"""
return len(self.results.image_url_list)
class ServiceEnum(Enum):
HYPERION = 1
HOYOLAB = 2
def __getitem__(self, item):
return self._data[item]

View File

@ -0,0 +1,41 @@
from typing import Mapping, Any, Optional
class APIHelperException(Exception):
pass
class NetworkException(APIHelperException):
pass
class TimedOut(APIHelperException):
pass
class ResponseException(APIHelperException):
code: int = 0
message: str = ""
def __init__(self, response: Optional[Mapping[str, Any]] = None, message: Optional[str] = None) -> None:
if response is None:
self.message = message
_message = message
else:
self.code = response.get("retcode", self.code)
self.message = response.get("message", "")
_message = f"[{self.code}] {self.message}"
super().__init__(_message)
class DataNotFoundError(ResponseException):
def __init__(self):
message = "response data not find"
super().__init__(message=message)
class ReturnCodeError(ResponseException):
def __init__(self):
message = "response return code error"
super().__init__(message=message)

View File

@ -1,44 +0,0 @@
import time
import httpx
from modules.apihelper.base import BaseResponseData
class GachaInfo:
GACHA_LIST_URL = "https://webstatic.mihoyo.com/hk4e/gacha_info/cn_gf01/gacha/list.json"
GACHA_INFO_URL = "https://webstatic.mihoyo.com/hk4e/gacha_info/cn_gf01/%s/zh-cn.json"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " \
"Chrome/90.0.4430.72 Safari/537.36"
def __init__(self):
self.headers = {
'User-Agent': self.USER_AGENT,
}
self.client = httpx.AsyncClient(headers=self.headers)
self.cache = {}
self.cache_ttl = 600
async def get_gacha_list_info(self) -> BaseResponseData:
if self.cache.get("time", 0) + self.cache_ttl < time.time():
self.cache.clear()
cache = self.cache.get("gacha_list_info")
if cache is not None:
return BaseResponseData(cache)
req = await self.client.get(self.GACHA_LIST_URL)
if req.is_error:
return BaseResponseData(error_message="请求错误")
self.cache["gacha_list_info"] = req.json()
self.cache["time"] = time.time()
return BaseResponseData(req.json())
async def get_gacha_info(self, gacha_id: str) -> dict:
cache = self.cache.get(gacha_id)
if cache is not None:
return cache
req = await self.client.get(self.GACHA_INFO_URL % gacha_id)
if req.is_error:
return {}
self.cache[gacha_id] = req.json()
return req.json()

View File

@ -1,96 +0,0 @@
from httpx import AsyncClient
from modules.apihelper.base import BaseResponseData
from modules.apihelper.helpers import get_ds, get_device_id, get_recognize_server
class Genshin:
SIGN_INFO_URL = "https://hk4e-api-os.hoyoverse.com/event/sol/info"
SIGN_URL = "https://hk4e-api-os.hoyoverse.com/event/sol/sign"
SIGN_HOME_URL = "https://hk4e-api-os.hoyoverse.com/event/sol/home"
APP_VERSION = "2.11.1"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " \
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"
REFERER = "https://webstatic.hoyoverse.com"
ORIGIN = "https://webstatic.hoyoverse.com"
ACT_ID = "e202102251931481"
DS_SALT = "6cqshh5dhw73bzxn20oexa9k516chk7s"
def __init__(self):
self.headers = {
"Origin": self.ORIGIN,
'DS': get_ds(self.DS_SALT),
'x-rpc-app_version': self.APP_VERSION,
'User-Agent': self.USER_AGENT,
'x-rpc-client_type': '5', # 1为ios 2为安卓 4为pc_web 5为mobile_web
'Referer': self.REFERER,
'x-rpc-device_id': get_device_id(self.USER_AGENT)}
self.client = AsyncClient(headers=self.headers)
async def is_sign(self, uid: int, region: str = "", cookies: dict = None, lang: str = 'zh-cn'):
"""
检查是否签到
:param lang: 语言
:param uid: 游戏UID
:param region: 服务器
:param cookies: cookie
:return:
"""
if region == "":
region = get_recognize_server(uid)
params = {
"act_id": self.ACT_ID,
"region": region,
"uid": uid,
"lang": lang
}
req = await self.client.get(self.SIGN_INFO_URL, params=params, cookies=cookies)
if req.is_error:
return BaseResponseData(error_message="请求错误")
return BaseResponseData(req.json())
async def sign(self, uid: int, region: str = "", cookies: dict = None, lang: str = 'zh-cn'):
"""
执行签到
:param lang:
:param uid: 游戏UID
:param region: 服务器
:param cookies: cookie
:return:
"""
if region == "":
region = get_recognize_server(uid)
data = {
"act_id": self.ACT_ID,
"region": region,
"uid": uid,
"lang": lang
}
req = await self.client.post(self.SIGN_URL, json=data, cookies=cookies)
if req.is_error:
return BaseResponseData(error_message="签到失败")
return BaseResponseData(req.json())
async def get_sign_give(self, cookies: dict = None, lang: str = 'zh-cn'):
"""
返回今日签到信息
:param lang:
:param cookies:
:return:
"""
params = {
"act_id": self.ACT_ID,
"lang": lang
}
req = await self.client.get(self.SIGN_HOME_URL, params=params, cookies=cookies)
if req.is_error:
return
return BaseResponseData(req.json())
async def __aenter__(self):
pass
async def __aexit__(self, exc_type, exc, tb):
await self.client.aclose()

View File

@ -1,17 +1,19 @@
import asyncio
import re
import time
from typing import List
import httpx
from httpx import AsyncClient
from modules.apihelper.base import HyperionResponse, ArtworkImage, BaseResponseData
from modules.apihelper.helpers import get_ds, get_device_id
from modules.apihelper.base import ArtworkImage, PostInfo
from modules.apihelper.helpers import get_device_id
from modules.apihelper.request.hoyorequest import HOYORequest
from utils.typedefs import JSONDict
class Hyperion:
"""
米忽悠bbs相关API请求
"""米忽悠bbs相关API请求
该名称来源于米忽悠的安卓BBS包名结尾考虑到大部分重要的功能确实是在移动端实现了
"""
@ -22,7 +24,7 @@ class Hyperion:
"Chrome/90.0.4430.72 Safari/537.36"
def __init__(self):
self.client = httpx.AsyncClient(headers=self.get_headers())
self.client = HOYORequest(headers=self.get_headers())
@staticmethod
def extract_post_id(text: str) -> int:
@ -81,47 +83,30 @@ class Hyperion:
f"{auto_orient}/interlace,{interlace}/format,{images_format}"
return {"x-oss-process": params}
async def get_post_full_in_collection(self, collection_id: int, gids: int = 2, order_type=1) -> BaseResponseData:
async def get_post_full_in_collection(self, collection_id: int, gids: int = 2, order_type=1) -> JSONDict:
params = {
"collection_id": collection_id,
"gids": gids,
"order_type": order_type
}
response = await self.client.get(url=self.POST_FULL_IN_COLLECTION_URL, params=params)
if response.is_error:
return BaseResponseData(error_message="请求错误")
return BaseResponseData(response.json())
return response
async def get_artwork_info(self, gids: int, post_id: int, read: int = 1) -> HyperionResponse:
async def get_post_info(self, gids: int, post_id: int, read: int = 1) -> PostInfo:
params = {
"gids": gids,
"post_id": post_id,
"read": read
}
response = await self.client.get(self.POST_FULL_URL, params=params)
if response.is_error:
return HyperionResponse(error_message="请求错误")
return HyperionResponse(response.json())
async def get_post_full_info(self, gids: int, post_id: int, read: int = 1) -> BaseResponseData:
params = {
"gids": gids,
"post_id": post_id,
"read": read
}
response = await self.client.get(self.POST_FULL_URL, params=params)
if response.is_error:
return BaseResponseData(error_message="请求错误")
return BaseResponseData(response.json())
return PostInfo.paste_data(response)
async def get_images_by_post_id(self, gids: int, post_id: int) -> List[ArtworkImage]:
artwork_info = await self.get_artwork_info(gids, post_id)
if artwork_info.error:
return []
urls = artwork_info.results.image_url_list
post_info = await self.get_post_info(gids, post_id)
art_list = []
task_list = [
self.download_image(artwork_info.post_id, urls[page], page) for page in range(len(urls))
self.download_image(post_info.post_id, post_info.image_urls[page], page)
for page in range(len(post_info.image_urls))
]
result_list = await asyncio.gather(*task_list)
for result in result_list:
@ -135,10 +120,8 @@ class Hyperion:
return art_list
async def download_image(self, art_id: int, url: str, page: int = 0) -> ArtworkImage:
response = await self.client.get(url, params=self.get_images_params(resize=2000), timeout=5)
if response.is_error:
return ArtworkImage(art_id, page, True)
return ArtworkImage(art_id, page, data=response.content)
response = await self.client.get(url, params=self.get_images_params(resize=2000), timeout=10, de_json=False)
return ArtworkImage(art_id=art_id, page=page, data=response)
async def get_new_list(self, gids: int, type_id: int, page_size: int = 20):
"""
@ -151,221 +134,163 @@ class Hyperion:
"type": type_id
}
response = await self.client.get(url=self.GET_NEW_LIST_URL, params=params)
if response.is_error:
return BaseResponseData(error_message="请求错误")
return BaseResponseData(response.json())
return response
async def close(self):
await self.client.aclose()
await self.client.shutdown()
class YuanShen:
SIGN_INFO_URL = "https://api-takumi.mihoyo.com/event/bbs_sign_reward/info"
SIGN_URL = "https://api-takumi.mihoyo.com/event/bbs_sign_reward/sign"
SIGN_HOME_URL = "https://api-takumi.mihoyo.com/event/bbs_sign_reward/home"
class GachaInfo:
GACHA_LIST_URL = "https://webstatic.mihoyo.com/hk4e/gacha_info/cn_gf01/gacha/list.json"
GACHA_INFO_URL = "https://webstatic.mihoyo.com/hk4e/gacha_info/cn_gf01/%s/zh-cn.json"
APP_VERSION = "2.3.0"
USER_AGENT = "Mozilla/5.0 (Linux; Android 9; Unspecified Device) AppleWebKit/537.36 (KHTML, like Gecko) " \
"Version/4.0 Chrome/39.0.0.0 Mobile Safari/537.36 miHoYoBBS/2.3.0"
REFERER = "https://webstatic.mihoyo.com/bbs/event/signin-ys/index.html?" \
"bbs_auth_required=true&act_id=e202009291139501&utm_source=hyperion&utm_medium=mys&utm_campaign=icon"
ORIGIN = "https://webstatic.mihoyo.com"
ACT_ID = "e202009291139501"
DS_SALT = "h8w582wxwgqvahcdkpvdhbh2w9casgfl"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " \
"Chrome/90.0.4430.72 Safari/537.36"
def __init__(self):
self.headers = {
"Origin": self.ORIGIN,
'DS': get_ds(self.DS_SALT),
'x-rpc-app_version': self.APP_VERSION,
'User-Agent': self.USER_AGENT,
'x-rpc-client_type': '5', # 1为ios 2为安卓 4为pc_web 5为mobile_web
'Referer': self.REFERER,
'x-rpc-device_id': get_device_id(self.USER_AGENT)}
self.client = AsyncClient(headers=self.headers)
}
self.client = HOYORequest(headers=self.headers)
self.cache = {}
self.cache_ttl = 600
async def is_sign(self, uid: int, region: str = "cn_gf01", cookies: dict = None):
async def get_gacha_list_info(self) -> dict:
if self.cache.get("time", 0) + self.cache_ttl < time.time():
self.cache.clear()
cache = self.cache.get("gacha_list_info")
if cache is not None:
return cache
req = await self.client.get(self.GACHA_LIST_URL)
self.cache["gacha_list_info"] = req
self.cache["time"] = time.time()
return req
async def get_gacha_info(self, gacha_id: str) -> dict:
cache = self.cache.get(gacha_id)
if cache is not None:
return cache
req = await self.client.get(self.GACHA_INFO_URL % gacha_id)
self.cache[gacha_id] = req
return req
class SignIn:
LOGIN_URL = "https://webapi.account.mihoyo.com/Api/login_by_mobilecaptcha"
S_TOKEN_URL = "https://api-takumi.mihoyo.com/auth/api/getMultiTokenByLoginTicket?" \
"login_ticket={0}&token_types=3&uid={1}"
BBS_URL = "https://api-takumi.mihoyo.com/account/auth/api/webLoginByMobile"
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " \
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15"
HEADERS = {
"Host": "webapi.account.mihoyo.com",
"Connection": "keep-alive",
"sec-ch-ua": "\".Not/A)Brand\";v=\"99\", \"Microsoft Edge\";v=\"103\", \"Chromium\";v=\"103\"",
"DNT": "1",
"x-rpc-device_model": "OS X 10.15.7",
"sec-ch-ua-mobile": "?0",
"User-Agent": USER_AGENT,
'x-rpc-device_id': get_device_id(USER_AGENT),
"Accept": "application/json, text/plain, */*",
"x-rpc-device_name": "Microsoft Edge 103.0.1264.62",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"x-rpc-client_type": "4",
"sec-ch-ua-platform": "\"macOS\"",
"Origin": "https://user.mihoyo.com",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://user.mihoyo.com/",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6"
}
BBS_HEADERS = {
"Host": "api-takumi.mihoyo.com",
"Content-Type": "application/json;charset=utf-8",
"Origin": "https://bbs.mihoyo.com",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"User-Agent": USER_AGENT,
"Referer": "https://bbs.mihoyo.com/",
"Accept-Language": "zh-CN,zh-Hans;q=0.9"
}
def __init__(self, phone: int):
self.phone = phone
self.client = AsyncClient()
self.uid = 0
self.cookie = {}
def parse_uid(self):
"""
检查是否签到
:param uid: 游戏UID
:param region: 服务器
:param cookies: cookie
从cookie中获取uid
:param self:
:return:
"""
params = {
"act_id": self.ACT_ID,
"region": region,
"uid": uid
}
req = await self.client.get(self.SIGN_INFO_URL, params=params, cookies=cookies)
if req.is_error:
return BaseResponseData(error_message="请求错误")
return BaseResponseData(req.json())
async def sign(self, uid: int, region: str = "cn_gf01", cookies: dict = None):
"""
执行签到
:param uid: 游戏UID
:param region: 服务器
:param cookies: cookie
:return:
"""
data = {
"act_id": self.ACT_ID,
"region": region,
"uid": uid
}
req = await self.client.post(self.SIGN_URL, json=data, cookies=cookies)
if req.is_error:
return BaseResponseData(error_message="签到失败")
return BaseResponseData(req.json())
async def get_sign_give(self, cookies: dict = None):
"""
返回今日签到信息
:param cookies:
:return:
"""
params = {
"act_id": self.ACT_ID
}
req = await self.client.get(self.SIGN_HOME_URL, params=params, cookies=cookies)
if req.is_error:
if "login_ticket" not in self.cookie:
return
return BaseResponseData(req.json())
for item in ["login_uid", "stuid", "ltuid", "account_id"]:
if item in self.cookie:
self.uid = self.cookie[item]
break
for item in ["login_uid", "stuid", "ltuid", "account_id"]:
self.cookie[item] = self.uid
async def __aenter__(self):
@staticmethod
def check_error(data: dict) -> bool:
"""
检查是否有错误
:param data:
:return:
"""
pass
res_data = data.get("data", {})
return res_data.get("msg") == "验证码错误" or res_data.get("info") == "Captcha not match Err"
async def __aexit__(self, exc_type, exc, tb):
"""
:param exc_type:
:param exc:
:param tb:
:return:
"""
await self.client.aclose()
async def login(self, captcha: int) -> bool:
data = await self.client.post(
self.LOGIN_URL,
data={"mobile": str(self.phone), "mobile_captcha": str(captcha), "source": "user.mihoyo.com"},
headers=self.HEADERS
)
res_json = data.json()
if self.check_error(res_json):
return False
class SignIn:
LOGIN_URL = "https://webapi.account.mihoyo.com/Api/login_by_mobilecaptcha"
S_TOKEN_URL = "https://api-takumi.mihoyo.com/auth/api/getMultiTokenByLoginTicket?" \
"login_ticket={0}&token_types=3&uid={1}"
BBS_URL = "https://api-takumi.mihoyo.com/account/auth/api/webLoginByMobile"
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " \
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15"
HEADERS = {
"Host": "webapi.account.mihoyo.com",
"Connection": "keep-alive",
"sec-ch-ua": "\".Not/A)Brand\";v=\"99\", \"Microsoft Edge\";v=\"103\", \"Chromium\";v=\"103\"",
"DNT": "1",
"x-rpc-device_model": "OS X 10.15.7",
"sec-ch-ua-mobile": "?0",
"User-Agent": USER_AGENT,
'x-rpc-device_id': get_device_id(USER_AGENT),
"Accept": "application/json, text/plain, */*",
"x-rpc-device_name": "Microsoft Edge 103.0.1264.62",
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
"x-rpc-client_type": "4",
"sec-ch-ua-platform": "\"macOS\"",
"Origin": "https://user.mihoyo.com",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://user.mihoyo.com/",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6"
}
BBS_HEADERS = {
"Host": "api-takumi.mihoyo.com",
"Content-Type": "application/json;charset=utf-8",
"Origin": "https://bbs.mihoyo.com",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"User-Agent": USER_AGENT,
"Referer": "https://bbs.mihoyo.com/",
"Accept-Language": "zh-CN,zh-Hans;q=0.9"
}
for k, v in data.cookies.items():
self.cookie[k] = v
def __init__(self, phone: int):
self.phone = phone
self.client = AsyncClient()
self.uid = 0
self.cookie = {}
self.parse_uid()
return bool(self.uid)
def parse_uid(self):
"""
从cookie中获取uid
:param self:
:return:
"""
if "login_ticket" not in self.cookie:
return
for item in ["login_uid", "stuid", "ltuid", "account_id"]:
if item in self.cookie:
self.uid = self.cookie[item]
break
for item in ["login_uid", "stuid", "ltuid", "account_id"]:
self.cookie[item] = self.uid
async def get_s_token(self):
data = await self.client.get(
self.S_TOKEN_URL.format(self.cookie["login_ticket"], self.uid),
headers={"User-Agent": self.USER_AGENT}
)
res_json = data.json()
res_data = res_json.get("data", {}).get("list", [])
for i in res_data:
if i.get("name") and i.get("token"):
self.cookie[i.get("name")] = i.get("token")
@staticmethod
def check_error(data: dict) -> bool:
"""
检查是否有错误
:param data:
:return:
"""
res_data = data.get("data", {})
return res_data.get("msg") == "验证码错误" or res_data.get("info") == "Captcha not match Err"
async def get_token(self, captcha: int) -> bool:
data = await self.client.post(
self.BBS_URL,
headers=self.BBS_HEADERS,
json={
"is_bh2": False,
"mobile": str(self.phone),
"captcha": str(captcha),
"action_type": "login",
"token_type": 6
}
)
res_json = data.json()
if self.check_error(res_json):
return False
async def login(self, captcha: int) -> bool:
data = await self.client.post(
self.LOGIN_URL,
data={"mobile": str(self.phone), "mobile_captcha": str(captcha), "source": "user.mihoyo.com"},
headers=self.HEADERS
)
res_json = data.json()
if self.check_error(res_json):
return False
for k, v in data.cookies.items():
self.cookie[k] = v
for k, v in data.cookies.items():
self.cookie[k] = v
self.parse_uid()
return bool(self.uid)
async def get_s_token(self):
data = await self.client.get(
self.S_TOKEN_URL.format(self.cookie["login_ticket"], self.uid),
headers={"User-Agent": self.USER_AGENT}
)
res_json = data.json()
res_data = res_json.get("data", {}).get("list", [])
for i in res_data:
if i.get("name") and i.get("token"):
self.cookie[i.get("name")] = i.get("token")
async def get_token(self, captcha: int) -> bool:
data = await self.client.post(
self.BBS_URL,
headers=self.BBS_HEADERS,
json={
"is_bh2": False,
"mobile": str(self.phone),
"captcha": str(captcha),
"action_type": "login",
"token_type": 6
}
)
res_json = data.json()
if self.check_error(res_json):
return False
for k, v in data.cookies.items():
self.cookie[k] = v
return "cookie_token" in self.cookie
return "cookie_token" in self.cookie

View File

@ -0,0 +1,39 @@
from typing import Union
import httpx
from modules.apihelper.error import NetworkException, ResponseException, DataNotFoundError, TimedOut
from modules.apihelper.request.httpxrequest import HTTPXRequest
from modules.apihelper.typedefs import POST_DATA, JSON_DATA
class HOYORequest(HTTPXRequest):
async def get(self, url: str, *args, de_json: bool = True, re_json_data: bool = False, **kwargs) \
-> Union[POST_DATA, JSON_DATA, bytes]:
try:
response = await self._client.get(url=url, *args, **kwargs)
except httpx.TimeoutException as err:
raise TimedOut from err
except httpx.HTTPError as exc:
raise NetworkException(f"Unknown error in HTTP implementation: {repr(exc)}") from exc
if response.is_error:
raise ResponseException(message=f"response error in status code: {response.status_code}")
if not de_json:
return response.content
json_data = response.json()
return_code = json_data.get("retcode", None)
data = json_data.get("data", None)
message = json_data.get("message", None)
if return_code is None:
if data is None:
raise DataNotFoundError
return json_data
if return_code != 0:
if message is None:
raise ResponseException(message=f"response error in return code: {return_code}")
else:
raise ResponseException(response=json_data)
if not re_json_data and data is not None:
return data
return json_data

View File

@ -0,0 +1,33 @@
from contextlib import AbstractAsyncContextManager
from types import TracebackType
from typing import Optional, Type
import httpx
class HTTPXRequest(AbstractAsyncContextManager):
def __init__(self, *args, headers=None, **kwargs):
self._client = httpx.AsyncClient(headers=headers, *args, **kwargs)
async def __aenter__(self):
try:
await self.initialize()
return self
except Exception as exc:
await self.shutdown()
raise exc
async def __aexit__(self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType]) -> None:
await self.initialize()
async def initialize(self):
if self._client.is_closed:
self._client = httpx.AsyncClient()
async def shutdown(self):
if self._client.is_closed:
return
await self._client.aclose()

View File

@ -0,0 +1,4 @@
from typing import Dict, Any
POST_DATA = Dict[str, Any]
JSON_DATA = Dict[str, Any]

View File

@ -15,7 +15,7 @@ from core.plugin import Plugin, handler, conversation
from core.user.error import UserNotFoundError
from core.user.models import User
from core.user.services import UserService
from modules.apihelper.hyperion import YuanShen
from modules.apihelper.hyperion import SignIn
from utils.decorators.error import error_callable
from utils.decorators.restricts import restricts
from utils.log import logger
@ -29,7 +29,7 @@ class AddUserCommandData(TelegramObject):
cookies: dict = {}
game_uid: int = 0
phone: int = 0
sign_in_client: Optional[YuanShen.SignIn] = None
sign_in_client: Optional[SignIn] = None
CHECK_SERVER, CHECK_PHONE, CHECK_CAPTCHA, INPUT_COOKIES, COMMAND_RESULT = range(10100, 10105)
@ -171,7 +171,7 @@ class SetUserCookies(Plugin.Conversation, BasePlugin.Conversation):
add_user_command_data: AddUserCommandData = context.chat_data.get("add_user_command_data")
if not add_user_command_data.sign_in_client:
phone = add_user_command_data.phone
client = YuanShen.SignIn(phone)
client = SignIn(phone)
try:
success = await client.login(captcha)
if not success:

View File

@ -11,7 +11,7 @@ from telegram.ext import CallbackContext, CommandHandler, MessageHandler, filter
from core.baseplugin import BasePlugin
from core.plugin import Plugin, handler
from core.template import TemplateService
from modules.apihelper.gacha import GachaInfo
from modules.apihelper.hyperion import GachaInfo
from plugins.genshin.gacha.wish import WishCountInfo, get_one
from utils.bot import get_all_args
from utils.decorators.error import error_callable
@ -41,12 +41,12 @@ class Gacha(Plugin, BasePlugin):
async def gacha_info(self, gacha_name: str = "角色活动", default: bool = False):
gacha_list_info = await self.gacha.get_gacha_list_info()
gacha_id = ""
for gacha in gacha_list_info.data["list"]:
for gacha in gacha_list_info["list"]:
if gacha["gacha_name"] == gacha_name:
gacha_id = gacha["gacha_id"]
if gacha_id == "":
if default and len(gacha_list_info.data["list"]) > 0:
gacha_id = gacha_list_info.data["list"][0]["gacha_id"]
if default and len(gacha_list_info["list"]) > 0:
gacha_id = gacha_list_info["list"][0]["gacha_id"]
else:
raise GachaNotFound(gacha_name)
gacha_info = await self.gacha.get_gacha_info(gacha_id)

View File

@ -74,9 +74,9 @@ class Post(Plugin.Conversation, BasePlugin):
if post_id == -1:
await message.reply_text("获取作品ID错误请检查连接是否合法", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END
post_full_info = await self.bbs.get_post_full_info(2, post_id)
post_info = await self.bbs.get_post_info(2, post_id)
post_images = await self.bbs.get_images_by_post_id(2, post_id)
post_data = post_full_info.data["post"]["post"]
post_data = post_info["post"]["post"]
post_subject = post_data['subject']
post_soup = BeautifulSoup(post_data["content"], features="html.parser")
post_p = post_soup.find_all('p')
@ -93,6 +93,9 @@ class Post(Plugin.Conversation, BasePlugin):
if len(post_images) > 1:
media = [InputMediaPhoto(img_info.data) for img_info in post_images]
media[0] = InputMediaPhoto(post_images[0].data, caption=post_text, parse_mode=ParseMode.MARKDOWN_V2)
if len(media) > 10:
media = media[0:10]
await message.reply_text("获取到的图片已经超过10张为了保证发送成功已经删除一部分图片")
await message.reply_media_group(media)
elif len(post_images) == 1:
image = post_images[0]
@ -284,3 +287,4 @@ class Post(Plugin.Conversation, BasePlugin):
return ConversationHandler.END
await message.reply_text("推送成功", reply_markup=ReplyKeyboardRemove())
return ConversationHandler.END

View File

@ -21,9 +21,7 @@ async def test_get_strategy(hyperion):
async def get_post_id(_collection_id: int, character_name: str) -> str:
post_full_in_collection = await hyperion.get_post_full_in_collection(_collection_id)
if post_full_in_collection.error:
raise RuntimeError(f"获取收藏信息错误,错误信息为:{post_full_in_collection.message}")
for post_data in post_full_in_collection.data["posts"]:
for post_data in post_full_in_collection["posts"]:
topics = post_data["topics"]
for topic in topics:
if character_name == topic["name"]:

View File

@ -0,0 +1,45 @@
"""Test Url
https://bbs.mihoyo.com/ys/article/29023709
"""
import logging
import pytest
import pytest_asyncio
from bs4 import BeautifulSoup
from flaky import flaky
from modules.apihelper.base import PostInfo
from modules.apihelper.hyperion import Hyperion
LOGGER = logging.getLogger(__name__)
@pytest_asyncio.fixture
async def hyperion():
_hyperion = Hyperion()
yield _hyperion
await _hyperion.close()
# noinspection PyShadowingNames
@pytest.mark.asyncio
@flaky(3, 1)
async def test_get_post_info(hyperion):
post_info = await hyperion.get_post_info(2, 29023709)
assert post_info
assert isinstance(post_info, PostInfo)
assert post_info["post"]["post"]["post_id"] == '29023709'
assert post_info.post_id == 29023709
assert post_info["post"]["post"]["subject"] == "《原神》长期项目启动·概念PV"
assert post_info.subject == "《原神》长期项目启动·概念PV"
assert len(post_info["post"]["post"]["images"]) == 1
post_soup = BeautifulSoup(post_info["post"]["post"]["content"], features="html.parser")
assert post_soup.find_all('p')
# noinspection PyShadowingNames
@pytest.mark.asyncio
@flaky(3, 1)
async def test_get_images_by_post_id(hyperion):
post_images = await hyperion.get_images_by_post_id(2, 29023709)
assert len(post_images) == 1

View File

@ -9,6 +9,7 @@ from telegram import Update, ReplyKeyboardRemove
from telegram.error import BadRequest, TimedOut, Forbidden
from telegram.ext import CallbackContext, ConversationHandler
from modules.apihelper.error import APIHelperException, ReturnCodeError
from utils.error import UrlResourcesNotFoundError
from utils.log import logger
@ -104,6 +105,14 @@ def error_callable(func: Callable) -> Callable:
logger.exception(exc)
await send_user_notification(update, context, "出错了呜呜呜 ~ 获取账号信息发生错误")
return ConversationHandler.END
except ReturnCodeError as exc:
await send_user_notification(update, context, f"出错了呜呜呜 ~ API请求错误 错误信息为 {exc.message}")
return ConversationHandler.END
except APIHelperException as exc:
logger.error("APIHelperException")
logger.exception(exc)
await send_user_notification(update, context, "出错了呜呜呜 ~ API请求错误")
return ConversationHandler.END
except BadRequest as exc:
logger.warning("python-telegram-bot 请求错误")
logger.exception(exc)