PamGram/plugins/tools/daily_note.py
2023-12-16 18:10:42 +08:00

370 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
from datetime import datetime
from typing import TYPE_CHECKING, List, Optional
from pydantic import BaseModel, validator
from simnet import Region
from simnet.errors import BadRequest as SimnetBadRequest, InvalidCookies, TimedOut as SimnetTimedOut
from sqlalchemy.orm.exc import StaleDataError
from telegram.constants import ParseMode
from telegram.error import BadRequest, Forbidden
from core.plugin import Plugin
from core.services.task.models import Task as TaskUser, TaskStatusEnum
from core.services.task.services import TaskResinServices, TaskExpeditionServices
from gram_core.plugin.methods.migrate_data import IMigrateData, MigrateDataException
from plugins.tools.genshin import GenshinHelper, PlayerNotFoundError, CookiesNotFoundError
from utils.log import logger
if TYPE_CHECKING:
from simnet import StarRailClient
from telegram.ext import ContextTypes
from gram_core.services.task.services import TaskServices
class TaskDataBase(BaseModel):
noticed: Optional[bool] = False
class ResinData(TaskDataBase):
notice_num: Optional[int] = 140
@validator("notice_num")
def notice_num_validator(cls, v):
if v < 100 or v > 240:
raise ValueError("开拓力提醒数值必须在 100 ~ 240 之间")
return v
class ExpeditionData(TaskDataBase):
pass
class WebAppData(BaseModel):
resin: Optional[ResinData]
expedition: Optional[ExpeditionData]
class DailyNoteTaskUser:
def __init__(
self,
user_id: int,
resin_db: Optional[TaskUser] = None,
expedition_db: Optional[TaskUser] = None,
):
self.user_id = user_id
self.resin_db = resin_db
self.expedition_db = expedition_db
self.resin = ResinData(**self.resin_db.data) if self.resin_db else None
self.expedition = ExpeditionData(**self.expedition_db.data) if self.expedition_db else None
@property
def status(self) -> TaskStatusEnum:
return max(
[
self.resin_db.status if self.resin_db else TaskStatusEnum.STATUS_SUCCESS,
self.expedition_db.status if self.expedition_db else TaskStatusEnum.STATUS_SUCCESS,
]
)
@status.setter
def status(self, value: TaskStatusEnum):
if self.resin_db:
self.resin_db.status = value
if self.expedition_db:
self.expedition_db.status = value
@staticmethod
def js_bool(value: bool) -> str:
return "true" if value else "false"
@staticmethod
def set_model_noticed(model: TaskDataBase):
data = model.copy(deep=True)
data.noticed = True
return data
@property
def web_config(self) -> str:
return base64.b64encode(
(
WebAppData(
resin=self.set_model_noticed(self.resin) if self.resin else None,
expedition=self.set_model_noticed(self.expedition) if self.expedition else None,
).json()
).encode()
).decode()
def save(self):
if self.resin_db:
self.resin_db.data = self.resin.dict()
if self.expedition_db:
self.expedition_db.data = self.expedition.dict()
class DailyNoteSystem(Plugin):
def __init__(
self,
genshin_helper: GenshinHelper,
resin_service: TaskResinServices,
expedition_service: TaskExpeditionServices,
):
self.genshin_helper = genshin_helper
self.resin_service = resin_service
self.expedition_service = expedition_service
async def get_single_task_user(self, user_id: int) -> DailyNoteTaskUser:
resin_db = await self.resin_service.get_by_user_id(user_id)
expedition_db = await self.expedition_service.get_by_user_id(user_id)
return DailyNoteTaskUser(
user_id=user_id,
resin_db=resin_db,
expedition_db=expedition_db,
)
@staticmethod
async def start_get_notes(
client: "StarRailClient",
user: DailyNoteTaskUser = None,
) -> List[str]:
if client.region == Region.OVERSEAS:
notes = await client.get_starrail_notes()
else:
notes = await client.get_starrail_notes_by_stoken()
if not user:
return []
notices = []
notice = None
if user.resin_db and notes.max_stamina > 0:
if notes.current_stamina >= user.resin.notice_num:
if not user.resin.noticed:
rec_time = datetime.now().astimezone() + notes.stamina_recover_time
notice = (
f"### 开拓力提示 ####\n\n当前开拓力为 {notes.current_stamina} / {notes.max_stamina} ,记得使用哦~\n"
f"预计全部恢复完成:{rec_time.strftime('%Y-%m-%d %H:%M')}"
)
user.resin.noticed = True
else:
user.resin.noticed = False
notices.append(notice)
notice = None
if user.expedition_db and len(notes.expeditions) > 0:
all_finished = all(i.status == "Finished" for i in notes.expeditions)
if all_finished:
if not user.expedition.noticed:
notice = "### 探索派遣提示 ####\n\n所有探索派遣已完成,记得重新派遣哦~"
user.expedition.noticed = True
else:
user.expedition.noticed = False
notices.append(notice)
user.save()
return notices
async def get_all_task_users(self) -> List[DailyNoteTaskUser]:
resin_list = await self.resin_service.get_all()
expedition_list = await self.expedition_service.get_all()
user_list = set()
for i in resin_list:
user_list.add(i.user_id)
for i in expedition_list:
user_list.add(i.user_id)
return [
DailyNoteTaskUser(
user_id=i,
resin_db=next((x for x in resin_list if x.user_id == i), None),
expedition_db=next((x for x in expedition_list if x.user_id == i), None),
)
for i in user_list
]
async def remove_task_user(self, user: DailyNoteTaskUser):
if user.resin_db:
await self.resin_service.remove(user.resin_db)
if user.expedition_db:
await self.expedition_service.remove(user.expedition_db)
async def update_task_user(self, user: DailyNoteTaskUser):
if user.resin_db:
try:
await self.resin_service.update(user.resin_db)
except StaleDataError:
logger.warning("用户 user_id[%s] 自动便签提醒 - 开拓力数据过期,跳过更新数据", user.user_id)
if user.expedition_db:
try:
await self.expedition_service.update(user.expedition_db)
except StaleDataError:
logger.warning("用户 user_id[%s] 自动便签提醒 - 探索派遣数据过期,跳过更新数据", user.user_id)
@staticmethod
async def check_need_note(web_config: WebAppData) -> bool:
need_verify = False
if web_config.resin and web_config.resin.noticed:
need_verify = True
if web_config.expedition and web_config.expedition.noticed:
need_verify = True
return need_verify
async def import_web_config(self, user_id: int, web_config: WebAppData):
user = await self.get_single_task_user(user_id)
if web_config.resin:
if web_config.resin.noticed:
if not user.resin_db:
resin = self.resin_service.create(
user_id,
user_id,
status=TaskStatusEnum.STATUS_SUCCESS,
data=ResinData(notice_num=web_config.resin.notice_num).dict(),
)
await self.resin_service.add(resin)
else:
user.resin.notice_num = web_config.resin.notice_num
user.resin.noticed = False
else:
if user.resin_db:
await self.resin_service.remove(user.resin_db)
user.resin_db = None
user.resin = None
if web_config.expedition:
if web_config.expedition.noticed:
if not user.expedition_db:
expedition = self.expedition_service.create(
user_id,
user_id,
status=TaskStatusEnum.STATUS_SUCCESS,
data=ExpeditionData().dict(),
)
await self.expedition_service.add(expedition)
else:
user.expedition.noticed = False
else:
if user.expedition_db:
await self.expedition_service.remove(user.expedition_db)
user.expedition_db = None
user.expedition = None
user.save()
await self.update_task_user(user)
async def do_get_notes_job(self, context: "ContextTypes.DEFAULT_TYPE"):
include_status: List[TaskStatusEnum] = [
TaskStatusEnum.STATUS_SUCCESS,
TaskStatusEnum.TIMEOUT_ERROR,
]
task_list = await self.get_all_task_users()
for task_db in task_list:
if task_db.status not in include_status:
continue
user_id = task_db.user_id
logger.info("自动便签提醒 - 请求便签信息 user_id[%s]", user_id)
try:
async with self.genshin_helper.genshin(user_id) as client:
text = await self.start_get_notes(client, task_db)
except InvalidCookies:
text = "自动便签提醒执行失败Cookie无效"
task_db.status = TaskStatusEnum.INVALID_COOKIES
except SimnetBadRequest as exc:
text = f"自动便签提醒执行失败API返回信息为 {str(exc)}"
task_db.status = TaskStatusEnum.GENSHIN_EXCEPTION
except SimnetTimedOut:
logger.info("用户 user_id[%s] 请求便签超时", user_id)
continue
except PlayerNotFoundError:
logger.info("用户 user_id[%s] 玩家不存在 关闭并移除自动便签提醒", user_id)
await self.remove_task_user(task_db)
continue
except CookiesNotFoundError:
logger.info("用户 user_id[%s] cookie 不存在 关闭并移除自动便签提醒", user_id)
await self.remove_task_user(task_db)
continue
except Exception as exc:
logger.error("执行自动便签提醒时发生错误 user_id[%s]", user_id, exc_info=exc)
text = "获取便签失败了呜呜呜 ~ 执行自动便签提醒时发生错误"
else:
task_db.status = TaskStatusEnum.STATUS_SUCCESS
for idx, task_user_db in enumerate([task_db.resin_db, task_db.expedition_db]):
if task_user_db is None:
continue
notice_text = text[idx] if isinstance(text, list) else text
if not notice_text:
continue
if task_user_db.chat_id < 0:
notice_text = (
f'<a href="tg://user?id={task_user_db.user_id}">'
f"NOTICE {task_user_db.user_id}</a>\n\n{notice_text}"
)
try:
await context.bot.send_message(task_user_db.chat_id, notice_text, parse_mode=ParseMode.HTML)
except BadRequest as exc:
logger.error("执行自动便签提醒时发生错误 user_id[%s] Message[%s]", user_id, exc.message)
task_user_db.status = TaskStatusEnum.BAD_REQUEST
except Forbidden as exc:
logger.error("执行自动便签提醒时发生错误 user_id[%s] message[%s]", user_id, exc.message)
task_user_db.status = TaskStatusEnum.FORBIDDEN
except Exception as exc:
logger.error("执行自动便签提醒时发生错误 user_id[%s]", user_id, exc_info=exc)
continue
else:
task_user_db.status = TaskStatusEnum.STATUS_SUCCESS
await self.update_task_user(task_db)
async def get_migrate_data(self, old_user_id: int, new_user_id: int, _) -> Optional["TaskMigrate"]:
return await TaskMigrate.create(
old_user_id,
new_user_id,
self.resin_service,
)
class TaskMigrate(IMigrateData):
old_user_id: int
new_user_id: int
task_services: "TaskServices"
need_migrate_tasks: List[TaskUser]
async def migrate_data_msg(self) -> str:
return f"定时任务数据 {len(self.need_migrate_tasks)}"
async def migrate_data(self) -> bool:
id_list = []
for task in self.need_migrate_tasks:
try:
await self.task_services.update(task)
except StaleDataError:
id_list.append(str(task.id))
if id_list:
raise MigrateDataException(f"任务数据迁移失败id {','.join(id_list)}")
return True
@staticmethod
async def create_tasks(
old_user_id: int,
new_user_id: int,
task_services: "TaskServices",
) -> List[TaskUser]:
need_migrate, _ = await TaskMigrate.filter_sql_data(
TaskUser,
task_services.get_all_by_user_id,
old_user_id,
new_user_id,
(TaskUser.user_id, TaskUser.type),
)
for i in need_migrate:
i.user_id = new_user_id
return need_migrate
@classmethod
async def create(
cls,
old_user_id: int,
new_user_id: int,
task_services: "TaskServices",
) -> Optional["TaskMigrate"]:
need_migrate_tasks = await cls.create_tasks(old_user_id, new_user_id, task_services)
if not need_migrate_tasks:
return None
self = cls()
self.old_user_id = old_user_id
self.new_user_id = new_user_id
self.task_services = task_services
self.need_migrate_tasks = need_migrate_tasks
return self