mirror of
https://github.com/PaiGramTeam/EnkaNetwork.py.git
synced 2024-11-16 03:45:28 +00:00
Merge pull request #40 from mrwan200/dev
## Update 1.4.1
- Change Catch class to baseCache class
- Fix `RuntimeWarning` error
- Add sync_build and merge_raw_data
- Add new example
- Fix little bit
Thank @GauravM512 and @luoshuijs for help and. guild this ❤️🔥
This commit is contained in:
commit
59cd9cff54
6
.gitignore
vendored
6
.gitignore
vendored
@ -1,10 +1,12 @@
|
||||
__pycache__
|
||||
enkanetwork.egg-info
|
||||
build
|
||||
./build
|
||||
enkanetwork.py.egg-info
|
||||
dist
|
||||
test/*.test.py
|
||||
.pytest_cache
|
||||
.idea
|
||||
git-story_media
|
||||
env
|
||||
env
|
||||
export.json
|
||||
843715177.json
|
@ -24,7 +24,7 @@ DEALINGS IN THE SOFTWARE."""
|
||||
|
||||
__title__ = 'enkanetwork.py'
|
||||
__author__ = 'M-307'
|
||||
__version__ = '1.4.0'
|
||||
__version__ = '1.4.1'
|
||||
__license__ = 'MIT'
|
||||
__copyright__ = 'Copyright 2022-present M-307'
|
||||
|
||||
|
@ -1,11 +1,18 @@
|
||||
import json
|
||||
|
||||
from typing import Any, Dict
|
||||
from typing import Any, Dict, Optional
|
||||
from cachetools import TTLCache
|
||||
|
||||
__all__ = ('Cache',)
|
||||
__all__ = ('StaticCache','Cache')
|
||||
|
||||
class Cache:
|
||||
async def get(self, key: str) -> Optional[Dict[str, Any]]:
|
||||
pass
|
||||
|
||||
async def set(self, key: str, value: Dict[str, Any]) -> None:
|
||||
pass
|
||||
|
||||
class StaticCache:
|
||||
def __init__(self, maxsize: int, ttl: int) -> None:
|
||||
self.cache = TTLCache(maxsize, ttl)
|
||||
|
||||
|
@ -4,6 +4,8 @@ import os
|
||||
import json
|
||||
import logging
|
||||
|
||||
from typing import Union, Optional, Type, TYPE_CHECKING, List, Any, Dict
|
||||
|
||||
from .http import HTTPClient
|
||||
from .model.base import (
|
||||
EnkaNetworkResponse,
|
||||
@ -14,10 +16,11 @@ from .model.build import Builds
|
||||
|
||||
from .assets import Assets
|
||||
from .enum import Language
|
||||
from .cache import Cache
|
||||
from .cache import Cache, StaticCache
|
||||
from .config import Config
|
||||
|
||||
from typing import Union, Optional, Type, TYPE_CHECKING, List, Any, Callable, Dict
|
||||
from .tools import (
|
||||
merge_raw_data
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing_extensions import Self
|
||||
@ -72,7 +75,16 @@ class EnkaNetworkAPI:
|
||||
"""
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
def __init__(self, *, lang: str = "en", debug: bool = False, key: str = "", cache: bool = True, user_agent: str = "", timeout: int = 10) -> None: # noqa: E501
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
lang: str = "en",
|
||||
debug: bool = False,
|
||||
key: str = "",
|
||||
cache: bool = True,
|
||||
user_agent: str = "",
|
||||
timeout: int = 10
|
||||
) -> None: # noqa: E501
|
||||
# Logging
|
||||
logging.basicConfig()
|
||||
logging.getLogger("enkanetwork").setLevel(logging.DEBUG if debug else logging.ERROR) # noqa: E501
|
||||
@ -83,7 +95,7 @@ class EnkaNetworkAPI:
|
||||
# Cache
|
||||
self._enable_cache = cache
|
||||
if self._enable_cache:
|
||||
Config.init_cache(Cache(1024, 60 * 1))
|
||||
Config.init_cache(StaticCache(1024, 60 * 1))
|
||||
|
||||
# http client
|
||||
self.__http = HTTPClient(key=key, agent=user_agent, timeout=timeout)
|
||||
@ -98,8 +110,8 @@ class EnkaNetworkAPI:
|
||||
exc_value: Optional[BaseException],
|
||||
traceback: Optional[TracebackType],
|
||||
) -> None:
|
||||
self._close = True
|
||||
if self._close:
|
||||
self._closed = True
|
||||
if self._closed:
|
||||
await self.__http.close()
|
||||
|
||||
def is_closed(self) -> bool:
|
||||
@ -162,10 +174,21 @@ class EnkaNetworkAPI:
|
||||
:class:`EnkaNetworkResponse`
|
||||
The response player data
|
||||
"""
|
||||
func = self.__http.fetch_user_by_uid(uid, info=info)
|
||||
data = await self.request_enka(func, uid)
|
||||
|
||||
# Loda cache
|
||||
cache = await self.__get_cache(uid)
|
||||
if cache:
|
||||
return EnkaNetworkResponse.parse_obj(cache)
|
||||
|
||||
data = await self.__http.fetch_user_by_uid(uid, info=info)
|
||||
data = self.__format_json(data)
|
||||
|
||||
# Return data
|
||||
self.LOGGER.debug("Parsing data...")
|
||||
|
||||
# Store cache
|
||||
await self.__store_cache(uid,data)
|
||||
|
||||
if "owner" in data:
|
||||
data["owner"] = {
|
||||
**data["owner"],
|
||||
@ -206,14 +229,26 @@ class EnkaNetworkAPI:
|
||||
:class:`EnkaNetworkProfileResponse`
|
||||
The response profile / hoyos and builds data
|
||||
"""
|
||||
func = self.__http.fetch_user_by_username(profile_id)
|
||||
data = await self.request_enka(func, profile_id)
|
||||
# Loda cache
|
||||
cache = await self.__get_cache(profile_id)
|
||||
if cache:
|
||||
return EnkaNetworkProfileResponse.parse_obj(cache)
|
||||
|
||||
data = await self.__http.fetch_user_by_username(profile_id)
|
||||
data = self.__format_json(data)
|
||||
|
||||
self.LOGGER.debug("Parsing data...")
|
||||
|
||||
return EnkaNetworkProfileResponse.parse_obj({
|
||||
# Store cache
|
||||
await self.__store_cache(profile_id,data)
|
||||
|
||||
# Fetch hoyos and build(s)
|
||||
data = {
|
||||
**data,
|
||||
"hoyos": await self.fetch_hoyos_by_username(profile_id)
|
||||
})
|
||||
}
|
||||
|
||||
return EnkaNetworkProfileResponse.parse_obj(data)
|
||||
|
||||
async def fetch_hoyos_by_username(
|
||||
self,
|
||||
@ -241,14 +276,23 @@ class EnkaNetworkAPI:
|
||||
|
||||
Returns
|
||||
------------
|
||||
List[:class:`PlayerHoyos`
|
||||
List[:class:`PlayerHoyos`]
|
||||
A response hoyos player data
|
||||
"""
|
||||
key = profile_id + ":hoyos"
|
||||
func = self.__http.fetch_hoyos_by_username(profile_id)
|
||||
data = await self.request_enka(func, key)
|
||||
|
||||
# Loda cache
|
||||
cache = await self.__get_cache(key)
|
||||
if cache:
|
||||
return self.__format_hoyos(profile_id, cache)
|
||||
|
||||
data = await self.__http.fetch_hoyos_by_username(profile_id)
|
||||
data = self.__format_json(data)
|
||||
self.LOGGER.debug("Parsing data...")
|
||||
|
||||
# Store cache
|
||||
await self.__store_cache(key, data)
|
||||
|
||||
return await self.__format_hoyos(profile_id, data)
|
||||
|
||||
async def fetch_builds(
|
||||
@ -285,37 +329,57 @@ class EnkaNetworkAPI:
|
||||
A response builds data
|
||||
"""
|
||||
key = profile_id + ":hoyos:" + metaname + ":builds"
|
||||
func = self.__http.fetch_hoyos_by_username(profile_id, metaname, True)
|
||||
data = await self.request_enka(func, key)
|
||||
# Loda cache
|
||||
cache = await self.__get_cache(key)
|
||||
if cache:
|
||||
return Builds.parse_obj(cache)
|
||||
|
||||
data = await self.__http.fetch_hoyos_by_username(profile_id, metaname, True)
|
||||
data = self.__format_json(data)
|
||||
self.LOGGER.debug("Parsing data...")
|
||||
|
||||
# Store cache
|
||||
await self.__store_cache(key, data)
|
||||
|
||||
return Builds.parse_obj(data)
|
||||
|
||||
async def request_enka(
|
||||
self,
|
||||
func: Callable,
|
||||
cache_key: str,
|
||||
):
|
||||
key = cache_key
|
||||
# Check config
|
||||
if Config.CACHE_ENABLED:
|
||||
self.LOGGER.warning(f"Getting data {key} from cache...")
|
||||
data = await Config.CACHE.get(key)
|
||||
async def fetch_raw_data(self, uid: Union[str, int], *, info: bool = False) -> Dict[str, Any]:
|
||||
"""Fetches raw data for a user with the given UID.
|
||||
|
||||
if data is not None:
|
||||
self.LOGGER.debug("Parsing data...")
|
||||
return data
|
||||
c
|
||||
"""
|
||||
|
||||
user = await func
|
||||
data = user["content"]
|
||||
data = json.loads(data)
|
||||
# Loda cache
|
||||
cache = await self.__get_cache(uid)
|
||||
if cache:
|
||||
return EnkaNetworkResponse.parse_obj(cache)
|
||||
|
||||
data = await self.__http.fetch_user_by_uid(uid, info=info)
|
||||
data = self.__format_json(data)
|
||||
|
||||
if Config.CACHE_ENABLED:
|
||||
self.LOGGER.debug(f"Caching data {key}...")
|
||||
await Config.CACHE.set(key, data)
|
||||
# Store cache
|
||||
await self.__store_cache(uid,data, cache=cache)
|
||||
|
||||
return data
|
||||
|
||||
async def sync_build(self, uid: Union[str, int], old_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
""" Sync build data
|
||||
|
||||
Parameters
|
||||
----------
|
||||
uid: Union[:class:`str`,:class:`int`]
|
||||
The UID of the user to fetch data for.
|
||||
old_data: Dict[:class:`str`, Any]
|
||||
The build old data.
|
||||
|
||||
Returns
|
||||
------
|
||||
A dictionary containing the merged data.
|
||||
"""
|
||||
|
||||
new_data = await self.fetch_raw_data(uid)
|
||||
return await merge_raw_data(new_data, old_data)
|
||||
|
||||
async def update_assets(self) -> None:
|
||||
print("Updating assets...")
|
||||
self.LOGGER.debug("Downloading new content...")
|
||||
@ -343,62 +407,35 @@ class EnkaNetworkAPI:
|
||||
**data[key]
|
||||
}) for key in data]
|
||||
|
||||
async def fetch_raw_data(self, uid: int) -> Dict[str, Any]:
|
||||
"""Fetches raw data for a user with the given UID.
|
||||
def __format_json(self, data: Any):
|
||||
data = data["content"]
|
||||
return json.loads(data)
|
||||
|
||||
async def __get_cache(
|
||||
self,
|
||||
cache_key: str,
|
||||
):
|
||||
key = cache_key
|
||||
# Check config
|
||||
if Config.CACHE_ENABLED:
|
||||
self.LOGGER.warning(f"Getting data {key} from cache...")
|
||||
data = await Config.CACHE.get(key)
|
||||
|
||||
if data is not None:
|
||||
self.LOGGER.debug("Parsing data...")
|
||||
return data
|
||||
|
||||
Parameters
|
||||
----------
|
||||
uid: The UID of the user to fetch data for.
|
||||
|
||||
Returns
|
||||
------
|
||||
A dictionary containing the raw data for the user.
|
||||
"""
|
||||
func = self.__http.fetch_user_by_uid(uid)
|
||||
data = await self.request_enka(func, uid)
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
async def merge_raw_data(
|
||||
new_data: Dict[str, Any], cache_data: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Merge cached data into newly fetched data.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
new_data: The newly fetched data as a dictionary.
|
||||
cache_data: The cached data as a dictionary.
|
||||
|
||||
Returns
|
||||
-------
|
||||
A dictionary containing the merged data.
|
||||
"""
|
||||
|
||||
async def combine_lists(
|
||||
new_list: List[Dict[str, Any]], cache_list: List[Dict[str, Any]]
|
||||
):
|
||||
new_ids = {item["avatarId"] for item in new_list}
|
||||
unique_cache_items = [
|
||||
item for item in cache_list if item["avatarId"] not in new_ids
|
||||
]
|
||||
new_list.extend(unique_cache_items)
|
||||
|
||||
if "showAvatarInfoList" in cache_data["playerInfo"]:
|
||||
new_data.setdefault("playerInfo", {}).setdefault("showAvatarInfoList", [])
|
||||
await combine_lists(
|
||||
new_data["playerInfo"]["showAvatarInfoList"],
|
||||
cache_data["playerInfo"]["showAvatarInfoList"],
|
||||
)
|
||||
|
||||
if "avatarInfoList" in cache_data:
|
||||
new_data.setdefault("avatarInfoList", [])
|
||||
await combine_lists(
|
||||
new_data["avatarInfoList"], cache_data["avatarInfoList"]
|
||||
)
|
||||
|
||||
return new_data
|
||||
async def __store_cache(self, key: str, data: Any, *, cache: Any = None):
|
||||
if Config.CACHE_ENABLED:
|
||||
self.LOGGER.debug(f"Caching data {key}...")
|
||||
if cache is None:
|
||||
await Config.CACHE.set(key, data)
|
||||
else:
|
||||
await Config.CACHE.set(key, await self.merge_raw_data(data, cache_data=cache))
|
||||
|
||||
# Concept by genshin.py python library
|
||||
fetch_user = fetch_user_by_uid
|
||||
fetch_profile = fetch_user_by_username
|
||||
merge_raw_data = merge_raw_data
|
||||
|
@ -2,7 +2,7 @@ from typing import (
|
||||
ClassVar
|
||||
)
|
||||
from .utils import get_user_agent
|
||||
from .cache import Cache
|
||||
from .cache import Cache, StaticCache
|
||||
|
||||
class Config:
|
||||
# HTTP Config
|
||||
@ -15,7 +15,7 @@ class Config:
|
||||
USER_AGENT: ClassVar[str] = get_user_agent()
|
||||
# Client config
|
||||
CACHE_ENABLED: ClassVar[bool] = True
|
||||
CACHE: ClassVar[Cache] = Cache(1024, 60 * 3)
|
||||
CACHE: ClassVar[Cache] = StaticCache(1024, 60 * 3)
|
||||
|
||||
@classmethod
|
||||
def init_cache(
|
||||
|
@ -159,7 +159,7 @@ class HTTPClient:
|
||||
|
||||
raise RuntimeError('Unreachable code in HTTP handling')
|
||||
|
||||
async def fetch_user_by_uid(
|
||||
def fetch_user_by_uid(
|
||||
self,
|
||||
uid: Union[str, int],
|
||||
*,
|
||||
@ -174,9 +174,9 @@ class HTTPClient:
|
||||
endpoint='enka',
|
||||
username=uid
|
||||
)
|
||||
return await self.request(r)
|
||||
return self.request(r)
|
||||
|
||||
async def fetch_user_by_username(
|
||||
def fetch_user_by_username(
|
||||
self,
|
||||
username: Union[str, int]
|
||||
) -> Response[EnkaNetworkPayload]:
|
||||
@ -186,9 +186,9 @@ class HTTPClient:
|
||||
endpoint='enka',
|
||||
username=username
|
||||
)
|
||||
return await self.request(r)
|
||||
return self.request(r)
|
||||
|
||||
async def fetch_hoyos_by_username(
|
||||
def fetch_hoyos_by_username(
|
||||
self,
|
||||
username: Union[str, int],
|
||||
metaname: str = "",
|
||||
@ -202,16 +202,16 @@ class HTTPClient:
|
||||
endpoint='enka',
|
||||
username=username
|
||||
)
|
||||
return await self.request(r)
|
||||
return self.request(r)
|
||||
|
||||
|
||||
async def fetch_asset(self, folder: str, filename: str) -> Response[DefaultPayload]:
|
||||
def fetch_asset(self, folder: str, filename: str) -> Response[DefaultPayload]:
|
||||
r = Route(
|
||||
'GET',
|
||||
f'/mrwan200/enkanetwork.py-data/master/exports/{folder}/{filename}',
|
||||
endpoint='assets'
|
||||
)
|
||||
return await self.request(r)
|
||||
return self.request(r)
|
||||
|
||||
async def read_from_url(self, url: str) -> bytes:
|
||||
async with self.__session.get(url) as resp:
|
||||
|
43
enkanetwork/tools.py
Normal file
43
enkanetwork/tools.py
Normal file
@ -0,0 +1,43 @@
|
||||
from typing import List, Any, Dict
|
||||
|
||||
async def merge_raw_data(
|
||||
new_data: Dict[str, Any],
|
||||
cache_data: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Merge cached data into newly fetched data.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
new_data: The newly fetched data as a dictionary.
|
||||
cache_data: The cached data as a dictionary.
|
||||
|
||||
Returns
|
||||
-------
|
||||
A dictionary containing the merged data.
|
||||
"""
|
||||
|
||||
async def combine_lists(
|
||||
new_list: List[Dict[str, Any]], cache_list: List[Dict[str, Any]]
|
||||
):
|
||||
new_ids = {item["avatarId"] for item in new_list}
|
||||
unique_cache_items = [
|
||||
item for item in cache_list if item["avatarId"] not in new_ids
|
||||
]
|
||||
new_list.extend(unique_cache_items)
|
||||
|
||||
if "showAvatarInfoList" in cache_data["playerInfo"]:
|
||||
new_data.setdefault("playerInfo", {}).setdefault(
|
||||
"showAvatarInfoList", [])
|
||||
await combine_lists(
|
||||
new_data["playerInfo"]["showAvatarInfoList"],
|
||||
cache_data["playerInfo"]["showAvatarInfoList"],
|
||||
)
|
||||
|
||||
if "avatarInfoList" in cache_data:
|
||||
new_data.setdefault("avatarInfoList", [])
|
||||
await combine_lists(
|
||||
new_data["avatarInfoList"], cache_data["avatarInfoList"]
|
||||
)
|
||||
|
||||
return new_data
|
16
example/build/export_build.py
Normal file
16
example/build/export_build.py
Normal file
@ -0,0 +1,16 @@
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
from enkanetwork import EnkaNetworkAPI
|
||||
|
||||
client = EnkaNetworkAPI(lang="th")
|
||||
|
||||
async def main():
|
||||
async with client:
|
||||
# Fetch data
|
||||
raw = await client.fetch_raw_data(843715177)
|
||||
# Write JSON file
|
||||
with open(f"./{raw['uid']}.json", "w", encoding="utf-8") as w:
|
||||
json.dump(raw, w, indent=4)
|
||||
|
||||
asyncio.run(main())
|
9
example/build/import_build.py
Normal file
9
example/build/import_build.py
Normal file
@ -0,0 +1,9 @@
|
||||
import json
|
||||
|
||||
from enkanetwork.model.base import EnkaNetworkResponse
|
||||
|
||||
UID = "843715177"
|
||||
|
||||
with open(f"./{UID}.json" , "r", encoding="utf-8") as f:
|
||||
response = EnkaNetworkResponse.parse_obj(json.load(f))
|
||||
print(response)
|
18
example/build/sync/main.py
Normal file
18
example/build/sync/main.py
Normal file
@ -0,0 +1,18 @@
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
from enkanetwork import EnkaNetworkAPI
|
||||
|
||||
client = EnkaNetworkAPI(lang="th")
|
||||
|
||||
async def main():
|
||||
async with client:
|
||||
# Load old data build
|
||||
with open("./raw.json", "r") as f:
|
||||
old = json.load(f)
|
||||
export = await client.sync_build(old["uid"], old)
|
||||
# Export new data
|
||||
with open("./export.json", "w", encoding="utf-8") as w:
|
||||
json.dump(export, w, indent=4)
|
||||
|
||||
asyncio.run(main())
|
3963
example/build/sync/raw.json
Normal file
3963
example/build/sync/raw.json
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user