from __future__ import annotations import os import json import asyncio import aiohttp import logging from typing import ( Any, ClassVar, Optional, TypeVar, Coroutine, NoReturn, Dict, Union, TYPE_CHECKING ) from . import utils from .utils import MISSING, RETRY_MAX from .exception import ( VaildateUIDError, UIDNotFounded, HTTPException, Forbidden, EnkaServerError ) if TYPE_CHECKING: T = TypeVar('T') Response = Coroutine[Any, Any, T] from .types.enkanetwork import EnkaNetwork as EnkaNetworkPayload class Route: BASE_URL: ClassVar[str] = "https://enka.network{PATH}" RAW_DATA_URL = "https://raw.githubusercontent.com/mrwan200/enkanetwork.py-data/{PATH}" def __init__( self, method: str, path: str, endpoint: str = 'enka', uid: Optional[str] = None, ): self.method = method self.uid = uid self.url = '' if endpoint == 'enka': self.url: str = self.BASE_URL.format(PATH=path) else: self.url: str = self.RAW_DATA_URL.format(PATH=path) class HTTPClient: LOGGER = logging.getLogger(__name__) def __init__(self, *, key: str = '', agent: str = '') -> None: self.__session: aiohttp.ClientSession = MISSING self.__headers: Dict = {} self.__agent = agent self.__key = key async def close(self) -> None: if self.__session is not MISSING: await self.__session.close() self.__session = MISSING self.LOGGER.debug('Session closed') else: self.LOGGER.debug('Session already closed') async def request(self, route: Route, **kwargs: Any) -> Any: method = route.method url = route.url uid = route.uid self.__headers.clear() if self.__agent != '': self.__headers['User-Agent'] = self.__agent kwargs['headers'] = {**utils.get_default_header(), **self.__headers} response: Optional[aiohttp.ClientResponse] = None data: Optional[Union[Dict[str, Any]]] = None if self.__session is MISSING: self.__session = aiohttp.ClientSession() for tries in range(RETRY_MAX): try: async with self.__session.request(method, url, **kwargs) as response: if 300 > response.status >= 200: data = await utils.to_data(response) if not data['content'] or response.status != 200: raise UIDNotFounded(f"UID {uid} not found.") self.LOGGER.debug('%s %s has received %s', method, url, data) return data # we are being rate limited # if response.status == 429: # Banned by Cloudflare more than likely. if response.status >= 400: self.LOGGER.warning(f"Failure to fetch {url} ({response.status}) Retry {tries} / {RETRY_MAX}") if tries > RETRY_MAX: raise HTTPException(f"Failed to download {url}") await asyncio.sleep(1 + tries * 2) # 1 + tries * 2 continue if response.status == 403: raise Forbidden("Forbidden 403") # TODO: คิดไม่ออกจะพิมพ์อะไร elif response.status >= 500: raise EnkaServerError("Server error") else: raise HTTPException("Unknown error") except OSError as e: # Connection reset by peer if tries < 4 and e.errno in (54, 10054): await asyncio.sleep(1 + tries * 2) continue raise if response is not None: # We've run out of retries, raise. if response.status >= 500: raise EnkaServerError("Server error") raise HTTPException("Unknown error") raise RuntimeError('Unreachable code in HTTP handling') def fetch_user(self, uid: Union[str, int]) -> Response[EnkaNetworkPayload]: if not utils.validate_uid(str(uid)): raise VaildateUIDError("Validate UID failed. Please check your UID.") url = f'/u/{uid}/__data.json' + ("?key={key}" if self.__key else "") return self.request(Route('GET', url, 'enka', uid)) async def update_asset(self, path: dict) -> NoReturn: self.LOGGER.debug("Downloading new content...") for folder in path: for filename in os.listdir(path[folder]): self.LOGGER.debug(f"Downloading {folder} file {filename}...") # get new assets url = f"master/exports/{folder}/{filename}" data = await self.request(Route('GET', url, 'assets')) self.LOGGER.debug(f"Writing {folder} file {filename}...") # dumps to json file with open(os.path.join(path[folder], filename), "w", encoding="utf-8") as f: json.dump(data["content"], f, ensure_ascii=False, indent=4)