PaiGram/utils/job/manager.py

45 lines
1.3 KiB
Python
Raw Normal View History

2022-08-11 13:18:12 +00:00
from typing import List
2022-07-26 10:07:31 +00:00
from telegram.ext import Application
from logger import Log
2022-08-11 13:18:12 +00:00
from utils.manager import ModulesManager
2022-07-26 10:07:31 +00:00
JobsClass: List[object] = []
def listener_jobs_class():
"""监听JOB
:return: None
"""
def decorator(func: object):
JobsClass.append(func)
return func
return decorator
2022-08-11 13:18:12 +00:00
class JobsManager(ModulesManager):
2022-07-26 10:07:31 +00:00
def __init__(self):
2022-08-11 13:18:12 +00:00
super().__init__()
2022-07-26 10:07:31 +00:00
self.job_list: List[str] = [] # 用于存储文件名称
self.exclude_list: List[str] = []
2022-08-11 13:18:12 +00:00
self.manager_name = "定时任务管理器"
2022-07-26 10:07:31 +00:00
@staticmethod
def add_handler(application: Application):
for func in JobsClass:
if callable(func):
try:
2022-08-04 12:58:12 +00:00
func.build_jobs(application.job_queue)
# Log.info(f"添加每日Job成功 Job名称[{handler.name}] Job每日执行时间[{handler.time.isoformat()}]")
2022-07-26 10:07:31 +00:00
except AttributeError as exc:
if "build_jobs" in str(exc):
Log.error("build_jobs 函数未找到", exc)
Log.error("初始化Class失败", exc)
except BaseException as exc:
Log.error("初始化Class失败", exc)
finally:
pass