GramCore/services/cookies/services.py
2024-03-16 18:44:26 +08:00

137 lines
5.4 KiB
Python

from typing import List, Optional
from gram_core.base_service import BaseService
from gram_core.basemodel import RegionEnum
from gram_core.services.cookies.cache import PublicCookiesCache
from gram_core.services.cookies.error import TooManyRequestPublicCookies
from gram_core.services.cookies.models import CookiesDataBase as Cookies, CookiesStatusEnum
from gram_core.services.cookies.repositories import CookiesRepository
from gram_core.services.devices.repositories import DevicesRepository
from utils.log import logger
__all__ = ("CookiesService", "PublicCookiesService", "NeedContinue")
class NeedContinue(Exception):
pass
class CookiesService(BaseService):
def __init__(self, cookies_repository: CookiesRepository) -> None:
self._repository: CookiesRepository = cookies_repository
async def update(self, cookies: Cookies):
await self._repository.update(cookies)
async def add(self, cookies: Cookies):
await self._repository.add(cookies)
async def get(
self,
user_id: int,
account_id: Optional[int] = None,
region: Optional[RegionEnum] = None,
) -> Optional[Cookies]:
return await self._repository.get(user_id, account_id, region)
async def delete(self, cookies: Cookies) -> None:
return await self._repository.delete(cookies)
async def get_all(
self,
user_id: Optional[int] = None,
account_id: Optional[int] = None,
region: Optional[RegionEnum] = None,
status: Optional[CookiesStatusEnum] = None,
) -> List[Cookies]:
return await self._repository.get_all(user_id, account_id, region, status)
class PublicCookiesService:
def __init__(
self,
cookies_repository: CookiesRepository,
public_cookies_cache: PublicCookiesCache,
devices_repository: DevicesRepository,
):
self._cache = public_cookies_cache
self._repository: CookiesRepository = cookies_repository
self.devices_repository = devices_repository
self.count: int = 0
self.user_times_limiter = 3 * 3
async def initialize(self) -> None:
logger.info("正在初始化公共Cookies池")
await self.refresh()
logger.success("刷新公共Cookies池成功")
async def refresh(self):
"""刷新公共Cookies 定时任务
:return:
"""
user_list: List[int] = []
data_list = await self._repository.get_by_devices(is_valid=True)
for cookies, devices in data_list:
user_list.append(cookies.user_id)
if len(user_list) > 0:
add, count = await self._cache.add_public_cookies(user_list, RegionEnum.HYPERION)
logger.info("国服公共Cookies池已经添加[%s]个 当前成员数为[%s]", add, count)
user_list.clear()
cookies_list = await self._repository.get_all(
region=RegionEnum.HOYOLAB, status=CookiesStatusEnum.STATUS_SUCCESS
)
for cookies in cookies_list:
user_list.append(cookies.user_id)
if len(user_list) > 0:
add, count = await self._cache.add_public_cookies(user_list, RegionEnum.HOYOLAB)
logger.info("国际服公共Cookies池已经添加[%s]个 当前成员数为[%s]", add, count)
async def check_public_cookie(self, region: RegionEnum, cookies: Cookies, public_id: int):
pass
async def get_cookies(self, user_id: int, region: RegionEnum = RegionEnum.NULL):
"""获取公共Cookies
:param user_id: 用户ID
:param region: 注册的服务器
:return:
"""
user_times = await self._cache.incr_by_user_times(user_id)
if int(user_times) > self.user_times_limiter:
logger.warning("用户 %s 使用公共Cookies次数已经到达上限", user_id)
raise TooManyRequestPublicCookies(user_id)
while True:
public_id, count = await self._cache.get_public_cookies(region)
cookies = await self._repository.get(public_id, region=region)
if cookies is None:
await self._cache.delete_public_cookies(public_id, region)
continue
try:
await self.check_public_cookie(region, cookies, public_id)
logger.info(
"用户 user_id[%s] 请求用户 user_id[%s] 的公共Cookies 该Cookies使用次数为%s",
user_id,
public_id,
count,
)
return cookies
except NeedContinue:
continue
async def undo(self, user_id: int, cookies: Optional[Cookies] = None, status: Optional[CookiesStatusEnum] = None):
await self._cache.incr_by_user_times(user_id, -1)
if cookies is not None and status is not None:
cookies.status = status
await self._repository.update(cookies)
await self._cache.delete_public_cookies(cookies.user_id, cookies.region)
logger.info(
"用户 user_id[%s] 反馈用户 user_id[%s] 的Cookies状态为 %s", user_id, cookies.user_id, status.name
)
else:
logger.info("用户 user_id[%s] 撤销一次公共Cookies计数", user_id)
async def set_device_valid(self, account_id: int, is_valid: bool) -> None:
device = await self.devices_repository.get(account_id)
if device:
device.is_valid = is_valid
await self.devices_repository.update(device)