PaiGram/utils/enkanetwork.py
LittleMengBot 72154924be
Update /play_card placeholder image
Update current player showcase after updating character list.
Use asynchronous locks to make file read/write thread-safe.
Update `EnkaNetworkCache`.

---------

Co-authored-by: 洛水居室 <luoshuijs@outlook.com>
2023-03-17 13:33:29 +08:00

46 lines
1.2 KiB
Python

from typing import Dict, Any, Optional, TYPE_CHECKING
from enkanetwork import Cache
try:
import ujson as jsonlib
except ImportError:
import json as jsonlib
if TYPE_CHECKING:
from redis import asyncio as aioredis
__all__ = ("RedisCache",)
class RedisCache(Cache):
def __init__(self, redis: "aioredis.Redis", key: Optional[str] = None, ex: int = 60 * 3) -> None:
self.redis = redis
self.ex = ex
self.key = key
def get_qname(self, key):
return f"{self.key}:{key}" if self.key else f"enka_network:{key}"
async def get(self, key) -> Optional[Dict[str, Any]]:
qname = self.get_qname(key)
data = await self.redis.get(qname)
if data:
json_data = str(data, encoding="utf-8")
return jsonlib.loads(json_data)
return None
async def set(self, key, value) -> None:
qname = self.get_qname(key)
data = jsonlib.dumps(value)
await self.redis.set(qname, data, ex=self.ex)
async def exists(self, key) -> int:
qname = self.get_qname(key)
return await self.redis.exists(qname)
async def ttl(self, key) -> int:
qname = self.get_qname(key)
return await self.redis.ttl(qname)