EnkaNetwork.py/enkanetwork/http.py

158 lines
4.8 KiB
Python
Raw Normal View History

from __future__ import annotations
import asyncio
import aiohttp
import logging
from typing import (
Any,
ClassVar,
Optional,
TypeVar,
Coroutine,
Dict,
Union,
TYPE_CHECKING
)
from . import utils
from .utils import MISSING, RETRY_MAX
from .exception import (
VaildateUIDError,
UIDNotFounded,
HTTPException,
Forbidden,
EnkaServerError
)
if TYPE_CHECKING:
T = TypeVar('T')
Response = Coroutine[Any, Any, T]
2022-08-05 02:19:03 +00:00
from .types.enkanetwork import (
EnkaNetwork as EnkaNetworkPayload,
Default as DefaultPayload,
)
class Route:
BASE_URL: ClassVar[str] = "https://enka.network{PATH}"
RAW_DATA_URL = "https://raw.githubusercontent.com/mrwan200/enkanetwork.py-data/{PATH}"
def __init__(
self,
method: str,
path: str,
endpoint: str = 'enka',
uid: Optional[str] = None,
) -> None:
self.method = method
self.uid = uid
self.url = ''
self.endpoint = endpoint
if endpoint == 'enka':
self.url: str = self.BASE_URL.format(PATH=path)
else:
self.url: str = self.RAW_DATA_URL.format(PATH=path)
class HTTPClient:
LOGGER = logging.getLogger(__name__)
def __init__(self, *, key: str = '', agent: str = '') -> None:
self.__session: aiohttp.ClientSession = MISSING
self.__headers: Dict = {}
self.__agent: str = agent
self.__key: str = key
async def close(self) -> None:
if self.__session is not MISSING:
await self.__session.close()
self.__session = MISSING
self.LOGGER.debug('Session closed')
else:
self.LOGGER.debug('Session already closed')
async def request(self, route: Route, **kwargs: Any) -> Any:
method = route.method
url = route.url
uid = route.uid
self.__headers.clear()
2022-08-04 17:22:48 +00:00
if self.__agent != '':
self.__headers['User-Agent'] = self.__agent
kwargs['headers'] = {**utils.get_default_header(), **self.__headers}
response: Optional[aiohttp.ClientResponse] = None
data: Optional[Union[Dict[str, Any]]] = None
if self.__session is MISSING:
self.__session = aiohttp.ClientSession()
for tries in range(RETRY_MAX):
try:
async with self.__session.request(method, url, **kwargs) as response:
if 300 > response.status >= 200:
data = await utils.to_data(response)
if not data['content'] or response.status != 200:
raise UIDNotFounded(f"UID {uid} not found.")
self.LOGGER.debug('%s %s has received %s', method, url, data)
return data
# we are being rate limited
# if response.status == 429:
# Banned by Cloudflare more than likely.
if response.status >= 400:
self.LOGGER.warning(f"Failure to fetch {url} ({response.status}) Retry {tries} / {RETRY_MAX}")
if tries > RETRY_MAX:
raise HTTPException(f"Failed to download {url}")
await asyncio.sleep(1) # 1 + tries * 2
continue
if response.status == 403:
raise Forbidden("Forbidden 403") # TODO: คิดไม่ออกจะพิมพ์อะไร
elif response.status >= 500:
raise EnkaServerError("Server error")
else:
raise HTTPException("Unknown error")
except OSError as e:
# Connection reset by peer
if tries < 4 and e.errno in (54, 10054):
await asyncio.sleep(1 + tries * 2)
continue
raise
if response is not None:
# We've run out of retries, raise.
if response.status >= 500:
raise EnkaServerError("Server error")
raise HTTPException("Unknown error")
raise RuntimeError('Unreachable code in HTTP handling')
def fetch_user(self, uid: Union[str, int]) -> Response[EnkaNetworkPayload]:
if not utils.validate_uid(str(uid)):
raise VaildateUIDError("Validate UID failed. Please check your UID.")
2022-08-05 02:19:03 +00:00
r = Route(
'GET',
f'/u/{uid}/__data.json' + (f"?key={self.__key}" if self.__key else ""),
endpoint='enka',
uid=uid
2022-08-05 02:19:03 +00:00
)
return self.request(r)
2022-08-05 02:21:21 +00:00
def fetch_asset(self, folder: str, filename: str) -> Response[DefaultPayload]:
2022-08-05 02:19:03 +00:00
r = Route(
'GET',
f'/master/exports/{folder}/{filename}',
endpoint='assets'
2022-08-05 02:19:03 +00:00
)
return self.request(r)