PamGram/modules/apihelper/client/base/hyperionrequest.py

67 lines
2.7 KiB
Python
Raw Normal View History

2022-10-08 00:59:08 +00:00
from typing import Union
import httpx
from httpx import Response
2022-10-08 00:59:08 +00:00
2022-12-10 12:37:43 +00:00
from .httpxrequest import HTTPXRequest
from ...error import NetworkException, ResponseException, APIHelperTimedOut
from ...typedefs import POST_DATA, JSON_DATA
2022-10-08 00:59:08 +00:00
2022-12-10 12:37:43 +00:00
__all__ = ("HyperionRequest",)
2022-10-08 00:59:08 +00:00
2022-12-10 12:37:43 +00:00
class HyperionRequest(HTTPXRequest):
async def get(
self, url: str, *args, de_json: bool = True, re_json_data: bool = False, **kwargs
) -> Union[POST_DATA, JSON_DATA, Response]:
2022-10-08 00:59:08 +00:00
try:
response = await self._client.get(url=url, *args, **kwargs)
except httpx.TimeoutException as err:
2022-10-21 07:06:24 +00:00
raise APIHelperTimedOut from err
2022-10-08 00:59:08 +00:00
except httpx.HTTPError as exc:
raise NetworkException(f"Unknown error in HTTP implementation: {repr(exc)}") from exc
if response.is_error:
raise ResponseException(message=f"response error in status code: {response.status_code}")
if not de_json:
return response
json_data = response.json()
return_code = json_data.get("retcode", None)
data = json_data.get("data", None)
message = json_data.get("message", None)
if return_code is None:
return json_data
if return_code != 0:
if message is None:
raise ResponseException(message=f"response error in return code: {return_code}")
raise ResponseException(response=json_data)
if not re_json_data and data is not None:
return data
return json_data
async def post(
self, url: str, *args, de_json: bool = True, re_json_data: bool = False, **kwargs
) -> Union[POST_DATA, JSON_DATA, Response]:
try:
response = await self._client.post(url=url, *args, **kwargs)
except httpx.TimeoutException as err:
raise APIHelperTimedOut from err
except httpx.HTTPError as exc:
raise NetworkException(f"Unknown error in HTTP implementation: {repr(exc)}") from exc
if response.is_error:
raise ResponseException(message=f"response error in status code: {response.status_code}")
if not de_json:
return response
2022-10-08 00:59:08 +00:00
json_data = response.json()
return_code = json_data.get("retcode", None)
data = json_data.get("data", None)
message = json_data.get("message", None)
if return_code is None:
return json_data
if return_code != 0:
if message is None:
raise ResponseException(message=f"response error in return code: {return_code}")
raise ResponseException(response=json_data)
2022-10-08 00:59:08 +00:00
if not re_json_data and data is not None:
return data
return json_data