2022-08-02 17:37:24 +00:00
|
|
|
import json
|
|
|
|
|
2023-02-16 18:03:33 +00:00
|
|
|
from typing import Any, Dict, Optional
|
2022-08-02 17:37:24 +00:00
|
|
|
from cachetools import TTLCache
|
|
|
|
|
2023-02-16 18:03:33 +00:00
|
|
|
__all__ = ('StaticCache','Cache')
|
2022-08-02 17:40:30 +00:00
|
|
|
|
2022-08-02 17:37:24 +00:00
|
|
|
class Cache:
|
2023-02-16 18:03:33 +00:00
|
|
|
async def get(self, key: str) -> Optional[Dict[str, Any]]:
|
|
|
|
pass
|
|
|
|
|
|
|
|
async def set(self, key: str, value: Dict[str, Any]) -> None:
|
|
|
|
pass
|
|
|
|
|
|
|
|
class StaticCache:
|
2022-08-05 01:56:35 +00:00
|
|
|
def __init__(self, maxsize: int, ttl: int) -> None:
|
2022-08-02 17:37:24 +00:00
|
|
|
self.cache = TTLCache(maxsize, ttl)
|
|
|
|
|
2022-08-05 01:56:35 +00:00
|
|
|
async def get(self, key) -> Dict[str, Any]:
|
2022-08-03 07:48:22 +00:00
|
|
|
data = self.cache.get(key)
|
|
|
|
return json.loads(data) if data is not None else data
|
2022-08-02 17:40:30 +00:00
|
|
|
|
2022-08-05 01:56:35 +00:00
|
|
|
async def set(self, key, value) -> None:
|
2022-08-02 17:40:30 +00:00
|
|
|
self.cache[key] = json.dumps(value)
|