mirror of
https://github.com/PaiGramTeam/PamGram.git
synced 2024-11-16 20:10:36 +00:00
38541428b9
✨ 添加验证通过插件
Co-authored-by: xtaodada <xtao@xtaolink.cn>
67 lines
2.7 KiB
Python
67 lines
2.7 KiB
Python
from typing import Union
|
|
|
|
import httpx
|
|
from httpx import Response
|
|
|
|
from modules.apihelper.error import NetworkException, ResponseException, APIHelperTimedOut
|
|
from modules.apihelper.request.httpxrequest import HTTPXRequest
|
|
from modules.apihelper.typedefs import POST_DATA, JSON_DATA
|
|
|
|
|
|
class HOYORequest(HTTPXRequest):
|
|
async def get(
|
|
self, url: str, *args, de_json: bool = True, re_json_data: bool = False, **kwargs
|
|
) -> Union[POST_DATA, JSON_DATA, Response]:
|
|
try:
|
|
response = await self._client.get(url=url, *args, **kwargs)
|
|
except httpx.TimeoutException as err:
|
|
raise APIHelperTimedOut from err
|
|
except httpx.HTTPError as exc:
|
|
raise NetworkException(f"Unknown error in HTTP implementation: {repr(exc)}") from exc
|
|
if response.is_error:
|
|
raise ResponseException(message=f"response error in status code: {response.status_code}")
|
|
if not de_json:
|
|
return response
|
|
json_data = response.json()
|
|
return_code = json_data.get("retcode", None)
|
|
data = json_data.get("data", None)
|
|
message = json_data.get("message", None)
|
|
if return_code is None:
|
|
return json_data
|
|
if return_code != 0:
|
|
if message is None:
|
|
raise ResponseException(message=f"response error in return code: {return_code}")
|
|
else:
|
|
raise ResponseException(response=json_data)
|
|
if not re_json_data and data is not None:
|
|
return data
|
|
return json_data
|
|
|
|
async def post(
|
|
self, url: str, *args, de_json: bool = True, re_json_data: bool = False, **kwargs
|
|
) -> Union[POST_DATA, JSON_DATA, Response]:
|
|
try:
|
|
response = await self._client.post(url=url, *args, **kwargs)
|
|
except httpx.TimeoutException as err:
|
|
raise APIHelperTimedOut from err
|
|
except httpx.HTTPError as exc:
|
|
raise NetworkException(f"Unknown error in HTTP implementation: {repr(exc)}") from exc
|
|
if response.is_error:
|
|
raise ResponseException(message=f"response error in status code: {response.status_code}")
|
|
if not de_json:
|
|
return response
|
|
json_data = response.json()
|
|
return_code = json_data.get("retcode", None)
|
|
data = json_data.get("data", None)
|
|
message = json_data.get("message", None)
|
|
if return_code is None:
|
|
return json_data
|
|
if return_code != 0:
|
|
if message is None:
|
|
raise ResponseException(message=f"response error in return code: {return_code}")
|
|
else:
|
|
raise ResponseException(response=json_data)
|
|
if not re_json_data and data is not None:
|
|
return data
|
|
return json_data
|