mirror of
https://github.com/PaiGramTeam/EnkaNetwork.py.git
synced 2024-11-16 03:45:28 +00:00
159 lines
5.2 KiB
Python
159 lines
5.2 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import json
|
|
import asyncio
|
|
import aiohttp
|
|
import logging
|
|
|
|
from typing import (
|
|
Any,
|
|
ClassVar,
|
|
Optional,
|
|
TypeVar,
|
|
Coroutine,
|
|
NoReturn,
|
|
Dict,
|
|
Union,
|
|
TYPE_CHECKING
|
|
)
|
|
from . import utils
|
|
from .utils import MISSING, RETRY_MAX
|
|
from .exception import (
|
|
VaildateUIDError,
|
|
UIDNotFounded,
|
|
HTTPException,
|
|
Forbidden,
|
|
EnkaServerError
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
T = TypeVar('T')
|
|
Response = Coroutine[Any, Any, T]
|
|
from .types.enkanetwork import EnkaNetwork as EnkaNetworkPayload
|
|
|
|
class Route:
|
|
|
|
BASE_URL: ClassVar[str] = "https://enka.network{PATH}"
|
|
RAW_DATA_URL = "https://raw.githubusercontent.com/mrwan200/enkanetwork.py-data/{PATH}"
|
|
|
|
def __init__(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
endpoint: str = 'enka',
|
|
uid: Optional[str] = None,
|
|
):
|
|
self.method = method
|
|
self.uid = uid
|
|
self.url = ''
|
|
if endpoint == 'enka':
|
|
self.url: str = self.BASE_URL.format(PATH=path)
|
|
else:
|
|
self.url: str = self.RAW_DATA_URL.format(PATH=path)
|
|
|
|
class HTTPClient:
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
def __init__(self, *, key: str = '', agent: str = '') -> None:
|
|
self.__session: aiohttp.ClientSession = MISSING
|
|
self.__headers: Dict = {}
|
|
self.__agent = agent
|
|
self.__key = key
|
|
|
|
async def close(self) -> None:
|
|
if self.__session is not MISSING:
|
|
await self.__session.close()
|
|
self.__session = MISSING
|
|
self.LOGGER.debug('Session closed')
|
|
else:
|
|
self.LOGGER.debug('Session already closed')
|
|
|
|
async def request(self, route: Route, **kwargs: Any) -> Any:
|
|
method = route.method
|
|
url = route.url
|
|
uid = route.uid
|
|
|
|
self.__headers.clear()
|
|
if self.__agent != '':
|
|
self.__headers['User-Agent'] = self.__agent
|
|
|
|
kwargs['headers'] = {**utils.get_default_header(), **self.__headers}
|
|
|
|
response: Optional[aiohttp.ClientResponse] = None
|
|
data: Optional[Union[Dict[str, Any]]] = None
|
|
|
|
if self.__session is MISSING:
|
|
self.__session = aiohttp.ClientSession()
|
|
|
|
for tries in range(RETRY_MAX):
|
|
try:
|
|
async with self.__session.request(method, url, **kwargs) as response:
|
|
if 300 > response.status >= 200:
|
|
data = await utils.to_data(response)
|
|
|
|
if not data['content'] or response.status != 200:
|
|
raise UIDNotFounded(f"UID {uid} not found.")
|
|
|
|
self.LOGGER.debug('%s %s has received %s', method, url, data)
|
|
return data
|
|
|
|
# we are being rate limited
|
|
# if response.status == 429:
|
|
# Banned by Cloudflare more than likely.
|
|
|
|
if response.status >= 400:
|
|
self.LOGGER.warning(f"Failure to fetch {url} ({response.status}) Retry {tries} / {RETRY_MAX}")
|
|
if tries > RETRY_MAX:
|
|
raise HTTPException(f"Failed to download {url}")
|
|
await asyncio.sleep(1 + tries * 2) # 1 + tries * 2
|
|
continue
|
|
|
|
if response.status == 403:
|
|
raise Forbidden("Forbidden 403") # TODO: คิดไม่ออกจะพิมพ์อะไร
|
|
elif response.status >= 500:
|
|
raise EnkaServerError("Server error")
|
|
else:
|
|
raise HTTPException("Unknown error")
|
|
|
|
except OSError as e:
|
|
# Connection reset by peer
|
|
if tries < 4 and e.errno in (54, 10054):
|
|
await asyncio.sleep(1 + tries * 2)
|
|
continue
|
|
raise
|
|
|
|
if response is not None:
|
|
# We've run out of retries, raise.
|
|
if response.status >= 500:
|
|
raise EnkaServerError("Server error")
|
|
|
|
raise HTTPException("Unknown error")
|
|
|
|
raise RuntimeError('Unreachable code in HTTP handling')
|
|
|
|
def fetch_user(self, uid: Union[str, int]) -> Response[EnkaNetworkPayload]:
|
|
if not utils.validate_uid(str(uid)):
|
|
raise VaildateUIDError("Validate UID failed. Please check your UID.")
|
|
url = f'/u/{uid}/__data.json' + ("?key={key}" if self.__key else "")
|
|
return self.request(Route('GET', url, 'enka', uid))
|
|
|
|
async def update_asset(self, path: dict) -> NoReturn:
|
|
|
|
self.LOGGER.debug("Downloading new content...")
|
|
|
|
for folder in path:
|
|
for filename in os.listdir(path[folder]):
|
|
self.LOGGER.debug(f"Downloading {folder} file {filename}...")
|
|
|
|
# get new assets
|
|
url = f"master/exports/{folder}/{filename}"
|
|
data = await self.request(Route('GET', url, 'assets'))
|
|
|
|
self.LOGGER.debug(f"Writing {folder} file {filename}...")
|
|
|
|
# dumps to json file
|
|
with open(os.path.join(path[folder], filename), "w", encoding="utf-8") as f:
|
|
json.dump(data["content"], f, ensure_ascii=False, indent=4)
|