mirror of
https://github.com/PaiGramTeam/PamGram.git
synced 2024-11-22 22:37:21 +00:00
602ce67e41
`apps` → `core`
39 lines
1.2 KiB
Python
39 lines
1.2 KiB
Python
from typing import List
|
|
|
|
from utils.redisdb import RedisDB
|
|
|
|
|
|
class BotAdminCache:
|
|
def __init__(self, redis: RedisDB):
|
|
self.client = redis.client
|
|
self.qname = "bot:admin"
|
|
|
|
async def get_list(self):
|
|
return [int(str_data) for str_data in await self.client.lrange(self.qname, 0, -1)]
|
|
|
|
async def set_list(self, str_list: List[int], ttl: int = -1):
|
|
await self.client.ltrim(self.qname, 1, 0)
|
|
await self.client.lpush(self.qname, *str_list)
|
|
if ttl != -1:
|
|
await self.client.expire(self.qname, ttl)
|
|
count = await self.client.llen(self.qname)
|
|
return count
|
|
|
|
|
|
class GroupAdminCache:
|
|
def __init__(self, redis: RedisDB):
|
|
self.client = redis.client
|
|
self.qname = "group:admin_list"
|
|
|
|
async def get_chat_admin(self, chat_id: int):
|
|
qname = f"{self.qname}:{chat_id}"
|
|
return [int(str_id) for str_id in await self.client.lrange(qname, 0, -1)]
|
|
|
|
async def set_chat_admin(self, chat_id: int, admin_list: List[int]):
|
|
qname = f"{self.qname}:{chat_id}"
|
|
await self.client.ltrim(qname, 1, 0)
|
|
await self.client.lpush(qname, *admin_list)
|
|
await self.client.expire(qname, 60)
|
|
count = await self.client.llen(qname)
|
|
return count
|