From 6adacbf5815f755bbcc2ecccf4f4b4c27084698b Mon Sep 17 00:00:00 2001 From: xtaodada Date: Sun, 15 Dec 2024 17:42:19 +0800 Subject: [PATCH] :sparkles: Support action_log import lazy refresh --- core/services/self_help/repositories.py | 16 ++++++++++++++++ core/services/self_help/services.py | 8 +++++++- plugins/jobs/import_action_log.py | 8 +++++--- plugins/tools/action_log_system.py | 17 ++++++++++------- plugins/zzz/action_log.py | 2 +- 5 files changed, 39 insertions(+), 12 deletions(-) diff --git a/core/services/self_help/repositories.py b/core/services/self_help/repositories.py index b848527..e7c9630 100644 --- a/core/services/self_help/repositories.py +++ b/core/services/self_help/repositories.py @@ -21,6 +21,22 @@ class ActionLogRepository(BaseService.Component): client: "InfluxDBClientAsync" return await client.write_api().write(self.bucket, record=p) + async def get_latest_record(self, uid: int) -> "FluxTable": + async with self.client() as client: + client: "InfluxDBClientAsync" + query = ( + 'from(bucket: "{}")' + "|> range(start: 0)" + '|> filter(fn: (r) => r["_measurement"] == "action_log")' + '|> filter(fn: (r) => r["uid"] == "{}")' + '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' + '|> sort(columns: ["_time"], desc: true)' + "|> limit(n: 1)" + ).format(self.bucket, uid) + tables = await client.query_api().query(query) + for table in tables: + return table + async def count_uptime_period(self, uid: int) -> "FluxTable": async with self.client() as client: client: "InfluxDBClientAsync" diff --git a/core/services/self_help/services.py b/core/services/self_help/services.py index d0d63cd..0bfae2a 100644 --- a/core/services/self_help/services.py +++ b/core/services/self_help/services.py @@ -1,4 +1,4 @@ -from typing import List, TYPE_CHECKING, Dict +from typing import List, TYPE_CHECKING, Dict, Optional from core.services.self_help.models import ActionLogModel from core.services.self_help.repositories import ActionLogRepository @@ -15,6 +15,12 @@ class ActionLogService(BaseService): async def add(self, p: List["ZZZSelfHelpActionLog"]) -> bool: return await self.repository.add([ActionLogModel.en(data) for data in p]) + async def get_latest_record(self, uid: int) -> Optional["ZZZSelfHelpActionLog"]: + r = await self.repository.get_latest_record(uid) + if not r: + return None + return ActionLogModel.de(r.records[0]) + async def count_uptime_period(self, uid: int) -> Dict[int, int]: """计算最近一个月不同时间点的登录次数""" data = {k: 0 for k in range(24)} diff --git a/plugins/jobs/import_action_log.py b/plugins/jobs/import_action_log.py index 5d6c260..3a9b64d 100644 --- a/plugins/jobs/import_action_log.py +++ b/plugins/jobs/import_action_log.py @@ -17,13 +17,15 @@ class ImportActionLogJob(Plugin): @job.run_daily(time=datetime.time(hour=12, minute=1, second=0), name="ImportActionLogJob") async def refresh(self, _: "ContextTypes.DEFAULT_TYPE"): - await self.action_log_system.daily_import_login(_) + await self.action_log_system.daily_import_login(True) @handler.command(command="action_log_import_all", block=False, admin=True) async def action_log_import_all(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"): user = update.effective_user - logger.info("用户 %s[%s] action_log_import_all 命令请求", user.full_name, user.id) + args = self.get_args(context) + is_lazy = "full" not in args + logger.info("用户 %s[%s] action_log_import_all 命令请求 is_lazy[%s]", user.full_name, user.id, is_lazy) message = update.effective_message reply = await message.reply_text("正在执行导入登录记录任务,请稍后...") - await self.refresh(context) + await self.action_log_system.daily_import_login(is_lazy) await reply.edit_text("全部账号导入登录记录任务完成") diff --git a/plugins/tools/action_log_system.py b/plugins/tools/action_log_system.py index b704a47..ac0316c 100644 --- a/plugins/tools/action_log_system.py +++ b/plugins/tools/action_log_system.py @@ -17,8 +17,6 @@ from plugins.tools.genshin import GenshinHelper, PlayerNotFoundError, CookiesNot from utils.log import logger if TYPE_CHECKING: - from telegram.ext import ContextTypes - from simnet import ZZZClient @@ -35,8 +33,13 @@ class ActionLogSystem(Plugin): self.helper = helper self.action_log_service = action_log_service - async def import_action_log(self, client: "ZZZClient", authkey: str) -> bool: - data = await client.get_zzz_action_log(authkey=authkey) + async def import_action_log(self, client: "ZZZClient", authkey: str, is_lazy: bool) -> bool: + min_id = 0 + if is_lazy: + record = await self.action_log_service.get_latest_record(client.player_id) + if record: + min_id = record.id + data = await client.get_zzz_action_log(authkey=authkey, min_id=min_id) # 确保第一个数据为登出、最后一条数据为登入 if not data: return False @@ -46,7 +49,7 @@ class ActionLogSystem(Plugin): data.pop(-1) return await self.action_log_service.add(data) - async def daily_import_login(self, _: "ContextTypes.DEFAULT_TYPE"): + async def daily_import_login(self, is_lazy: bool): logger.info("正在执行每日刷新登录记录任务") for cookie_model in await self.cookies.get_all( region=RegionEnum.HYPERION, status=CookiesStatusEnum.STATUS_SUCCESS @@ -57,13 +60,13 @@ class ActionLogSystem(Plugin): continue try: async with self.helper.genshin(user_id, region=RegionEnum.HYPERION) as client: - client: "StarRailClient" + client: "ZZZClient" try: authkey = await client.get_authkey_by_stoken("csc") except ValueError: logger.warning("用户 user_id[%s] 请求登录记录失败 无 stoken", user_id) continue - await self.import_action_log(client, authkey) + await self.import_action_log(client, authkey, is_lazy) except (InvalidCookies, PlayerNotFoundError, CookiesNotFoundError): continue except SimnetBadRequest as exc: diff --git a/plugins/zzz/action_log.py b/plugins/zzz/action_log.py index c32bbe7..38b216c 100644 --- a/plugins/zzz/action_log.py +++ b/plugins/zzz/action_log.py @@ -70,7 +70,7 @@ class ActionLogPlugins(Plugin): notice = await message.reply_text(f"{config.notice.bot_name}需要收集整理数据,还请耐心等待哦~") - bo = await self.action_log_system.import_action_log(client, authkey) + bo = await self.action_log_system.import_action_log(client, authkey, True) text = "导入登录记录成功" if bo else "导入登录记录失败,可能没有新记录" await notice.edit_text(text) self.log_user(update, logger.success, text)