添加 Job 相关处理

添加 `Job` 相关处理
设定全局时区为 `Asia/Shanghai`
This commit is contained in:
洛水.山岭居室 2022-07-08 10:42:07 +08:00 committed by GitHub
parent d9fa9cd466
commit 6543519dc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 319 additions and 8 deletions

View File

@ -3,7 +3,7 @@ from typing import Optional
from telegram.ext import CommandHandler, MessageHandler, filters, CallbackQueryHandler, InlineQueryHandler, Application from telegram.ext import CommandHandler, MessageHandler, filters, CallbackQueryHandler, InlineQueryHandler, Application
from logger import Log from logger import Log
from manager import PluginsManager from manager import PluginsManager, JobsManager
from plugins.auth import Auth from plugins.auth import Auth
from plugins.base import NewChatMembersHandler from plugins.base import NewChatMembersHandler
from plugins.errorhandler import error_handler from plugins.errorhandler import error_handler
@ -65,3 +65,19 @@ def register_handlers(application: Application, service: BaseService):
application.add_error_handler(error_handler, block=False) application.add_error_handler(error_handler, block=False)
Log.info("插件加载成功") Log.info("插件加载成功")
def register_job(application: Application, service: BaseService):
Log.info("正在加载Job管理器")
jobs_manager = JobsManager()
jobs_manager.refresh_list("./jobs/*")
# 忽略内置模块
jobs_manager.add_exclude(["base"])
Log.info("Job管理器正在加载插件")
jobs_manager.import_module()
jobs_manager.add_handler(application, service)
Log.info("Job加载成功")

203
jobs/base.py Normal file
View File

@ -0,0 +1,203 @@
import datetime
from typing import Union, Tuple
from telegram.ext import CallbackContext
from telegram.ext._utils.types import JobCallback
from model.types import JSONDict
class BaseJobHandler:
pass
class RunDailyHandler:
def __init__(self, callback: JobCallback, time: datetime.time, days: Tuple[int, ...] = tuple(range(7)),
data: object = None, name: str = None, chat_id: int = None, user_id: int = None,
job_kwargs: JSONDict = None,):
"""Creates a new :class:`Job` that runs on a daily basis and adds it to the queue.
Note:
For a note about DST, please see the documentation of `APScheduler`_.
.. _`APScheduler`: https://apscheduler.readthedocs.io/en/stable/modules/triggers/cron.html
#daylight-saving-time-behavior
Args:
callback (:term:`coroutine function`): The callback function that should be executed by
the new job. Callback signature::
async def callback(context: CallbackContext)
time (:obj:`datetime.time`): Time of day at which the job should run. If the timezone
(:obj:`datetime.time.tzinfo`) is :obj:`None`, the default timezone of the bot will
be used, which is UTC unless :attr:`telegram.ext.Defaults.tzinfo` is used.
days (Tuple[:obj:`int`], optional): Defines on which days of the week the job should
run (where ``0-6`` correspond to sunday - saturday). By default, the job will run
every day.
.. versionchanged:: 20.0
Changed day of the week mapping of 0-6 from monday-sunday to sunday-saturday.
data (:obj:`object`, optional): Additional data needed for the callback function.
Can be accessed through :attr:`Job.data` in the callback. Defaults to
:obj:`None`.
.. versionchanged:: 20.0
Renamed the parameter ``context`` to :paramref:`data`.
name (:obj:`str`, optional): The name of the new job. Defaults to
:external:attr:`callback.__name__ <definition.__name__>`.
chat_id (:obj:`int`, optional): Chat id of the chat associated with this job. If
passed, the corresponding :attr:`~telegram.ext.CallbackContext.chat_data` will
be available in the callback.
.. versionadded:: 20.0
user_id (:obj:`int`, optional): User id of the user associated with this job. If
passed, the corresponding :attr:`~telegram.ext.CallbackContext.user_data` will
be available in the callback.
.. versionadded:: 20.0
job_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to pass to the
:meth:`apscheduler.schedulers.base.BaseScheduler.add_job()`.
"""
# 复制文档
self.job_kwargs = job_kwargs
self.user_id = user_id
self.chat_id = chat_id
self.name = name
self.data = data
self.days = days
self.time = time
self.callback = callback
@property
def get_kwargs(self) -> dict:
kwargs = {
"callback": self.callback,
"time": self.time,
"days": self.days,
"data": self.data,
"name": self.name,
"chat_id": self.chat_id,
"user_id": self.callback,
"job_kwargs": self.job_kwargs,
}
return kwargs
class RunRepeatingHandler:
def __init__(self, callback: JobCallback, interval: Union[float, datetime.timedelta],
first: Union[float, datetime.timedelta, datetime.datetime, datetime.time] = None,
last: Union[float, datetime.timedelta, datetime.datetime, datetime.time] = None,
context: object = None, name: str = None, chat_id: int = None, user_id: int = None,
job_kwargs: JSONDict = None):
"""Creates a new :class:`Job` instance that runs at specified intervals and adds it to the
queue.
Note:
For a note about DST, please see the documentation of `APScheduler`_.
.. _`APScheduler`: https://apscheduler.readthedocs.io/en/stable/modules/triggers/cron.html
#daylight-saving-time-behavior
Args:
callback (:term:`coroutine function`): The callback function that should be executed by
the new job. Callback signature::
async def callback(context: CallbackContext)
interval (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta`): The interval in which
the job will run. If it is an :obj:`int` or a :obj:`float`, it will be interpreted
as seconds.
first (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta` | \
:obj:`datetime.datetime` | :obj:`datetime.time`, optional):
Time in or at which the job should run. This parameter will be interpreted
depending on its type.
* :obj:`int` or :obj:`float` will be interpreted as "seconds from now" in which the
job should run.
* :obj:`datetime.timedelta` will be interpreted as "time from now" in which the
job should run.
* :obj:`datetime.datetime` will be interpreted as a specific date and time at
which the job should run. If the timezone (:attr:`datetime.datetime.tzinfo`) is
:obj:`None`, the default timezone of the bot will be used.
* :obj:`datetime.time` will be interpreted as a specific time of day at which the
job should run. This could be either today or, if the time has already passed,
tomorrow. If the timezone (:attr:`datetime.time.tzinfo`) is :obj:`None`, the
default timezone of the bot will be used, which is UTC unless
:attr:`telegram.ext.Defaults.tzinfo` is used.
Defaults to :paramref:`interval`
last (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta` | \
:obj:`datetime.datetime` | :obj:`datetime.time`, optional):
Latest possible time for the job to run. This parameter will be interpreted
depending on its type. See :paramref:`first` for details.
If :paramref:`last` is :obj:`datetime.datetime` or :obj:`datetime.time` type
and ``last.tzinfo`` is :obj:`None`, the default timezone of the bot will be
assumed, which is UTC unless :attr:`telegram.ext.Defaults.tzinfo` is used.
Defaults to :obj:`None`.
data (:obj:`object`, optional): Additional data needed for the callback function.
Can be accessed through :attr:`Job.data` in the callback. Defaults to
:obj:`None`.
.. versionchanged:: 20.0
Renamed the parameter ``context`` to :paramref:`data`.
name (:obj:`str`, optional): The name of the new job. Defaults to
:external:attr:`callback.__name__ <definition.__name__>`.
chat_id (:obj:`int`, optional): Chat id of the chat associated with this job. If
passed, the corresponding :attr:`~telegram.ext.CallbackContext.chat_data` will
be available in the callback.
.. versionadded:: 20.0
user_id (:obj:`int`, optional): User id of the user associated with this job. If
passed, the corresponding :attr:`~telegram.ext.CallbackContext.user_data` will
be available in the callback.
.. versionadded:: 20.0
job_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to pass to the
:meth:`apscheduler.schedulers.base.BaseScheduler.add_job()`.
"""
# 复制文档
self.callback = callback
self.interval = interval
self.first = first
self.last = last
self.context = context
self.name = name
self.chat_id = chat_id
self.user_id = user_id
self.job_kwargs = job_kwargs
@property
def get_kwargs(self) -> dict:
kwargs = {
"callback": self.callback,
"interval": self.interval,
"first": self.first,
"last": self.last,
"context": self.context,
"name": self.name,
"chat_id": self.chat_id,
"user_id": self.callback,
"job_kwargs": self.job_kwargs,
}
return kwargs
class BaseJob:
@staticmethod
def remove_job_if_exists(name: str, context: CallbackContext) -> bool:
current_jobs = context.job_queue.get_jobs_by_name(name)
context.job_queue.run_repeating()
if not current_jobs:
return False
for job in current_jobs:
job.schedule_removal()
return True

12
main.py
View File

@ -1,12 +1,14 @@
import asyncio import asyncio
from warnings import filterwarnings from warnings import filterwarnings
from telegram.ext import Application, ContextTypes import pytz
from telegram.ext import Application, ContextTypes, Defaults
from telegram.warnings import PTBUserWarning from telegram.warnings import PTBUserWarning
from utils.base import PaimonContext from utils.base import PaimonContext
from config import config from config import config
from handler import register_handlers from handler import register_handlers, register_job
from logger import Log from logger import Log
from service import StartService from service import StartService
from utils.aiobrowser import AioBrowser from utils.aiobrowser import AioBrowser
@ -44,7 +46,9 @@ def main() -> None:
# 自定义 context 类型 # 自定义 context 类型
context_types = ContextTypes(context=PaimonContext) context_types = ContextTypes(context=PaimonContext)
application = Application.builder().token(config.TELEGRAM["token"]).context_types(context_types).build() defaults = Defaults(tzinfo=pytz.timezone("Asia/Shanghai"))
application = Application.builder().token(config.TELEGRAM["token"]).context_types(context_types).defaults(defaults).build()
# 保存实例化的类到 bot_data # 保存实例化的类到 bot_data
# 这样在每个实例去获取 service 时 # 这样在每个实例去获取 service 时
@ -53,6 +57,8 @@ def main() -> None:
register_handlers(application, service) register_handlers(application, service)
register_job(application, service)
# 启动BOT # 启动BOT
try: try:
Log.info("BOT已经启动 开始处理命令") Log.info("BOT已经启动 开始处理命令")

View File

@ -2,14 +2,16 @@ import os
from glob import glob from glob import glob
from importlib import import_module from importlib import import_module
from os import path from os import path
from typing import List, Union, Tuple, Callable from typing import List, Union, Tuple
from telegram.ext import Application from telegram.ext import Application
from jobs.base import RunDailyHandler
from logger import Log from logger import Log
from service import BaseService from service import BaseService
PluginsClass: List[Tuple[any, dict]] = [] PluginsClass: List[Tuple[object, dict]] = []
JobsClass: List[Tuple[object, dict]] = []
def listener_plugins_class(need_service: bool = False): def listener_plugins_class(need_service: bool = False):
@ -22,7 +24,7 @@ def listener_plugins_class(need_service: bool = False):
"need_service": need_service "need_service": need_service
} }
def decorator(func: Callable): def decorator(func: object):
PluginsClass.append( PluginsClass.append(
(func, plugin_info) (func, plugin_info)
) )
@ -31,6 +33,25 @@ def listener_plugins_class(need_service: bool = False):
return decorator return decorator
def listener_jobs_class(need_service: bool = False):
"""监听JOB
:param need_service: 插件类中 create_handlers 函数是否传入 service
:return: None
"""
job_info = {
"need_service": need_service
}
def decorator(func: object):
JobsClass.append(
(func, job_info)
)
return func
return decorator
class PluginsManager: class PluginsManager:
def __init__(self): def __init__(self):
self.plugin_list: List[str] = [] # 用于存储文件名称 self.plugin_list: List[str] = [] # 用于存储文件名称
@ -91,3 +112,67 @@ class PluginsManager:
Log.error("初始化Class失败", exc) Log.error("初始化Class失败", exc)
finally: finally:
pass pass
class JobsManager:
def __init__(self):
self.job_list: List[str] = [] # 用于存储文件名称
self.exclude_list: List[str] = []
def refresh_list(self, plugin_paths):
self.job_list.clear()
plugin_paths = glob(plugin_paths)
for plugin_path in plugin_paths:
if plugin_path.startswith('__'):
continue
module_name = path.basename(path.normpath(plugin_path))
root, ext = os.path.splitext(module_name)
if ext == ".py":
self.job_list.append(root)
def add_exclude(self, exclude: Union[str, List[str]]):
if isinstance(exclude, str):
self.exclude_list.append(exclude)
elif isinstance(exclude, list):
self.exclude_list.extend(exclude)
else:
raise TypeError
def import_module(self):
for job_name in self.job_list:
if job_name not in self.exclude_list:
try:
import_module(f"jobs.{job_name}")
except ImportError as exc:
Log.warning(f"Job模块 {job_name} 导入失败", exc)
except ImportWarning as exc:
Log.warning(f"Job模块 {job_name} 加载成功但有警告", exc)
except BaseException as exc:
Log.warning(f"Job模块 {job_name} 加载失败", exc)
else:
Log.debug(f"Job模块 {job_name} 加载成功")
@staticmethod
def add_handler(application: Application, service: BaseService):
for pc in JobsClass:
func = pc[0]
plugin_info = pc[1]
# 构建 kwargs
kwargs = {}
if plugin_info.get("need_service", False):
kwargs["service"] = service
if callable(func):
try:
handlers_list = func.build_jobs(**kwargs)
for handler in handlers_list:
if isinstance(handler, RunDailyHandler):
application.job_queue.run_daily(**handler.get_kwargs)
Log.info(f"添加每日Job成功 Job名称[{handler.name}] Job每日执行时间[{handler.time.isoformat()}]")
except AttributeError as exc:
if "build_jobs" in str(exc):
Log.error("build_jobs 函数未找到", exc)
Log.error("初始化Class失败", exc)
except BaseException as exc:
Log.error("初始化Class失败", exc)
finally:
pass

View File

@ -15,4 +15,5 @@ pyppeteer~=1.0.2
lxml>=4.9.0 lxml>=4.9.0
fakeredis>=1.8.1 fakeredis>=1.8.1
aiohttp<=3.8.1 aiohttp<=3.8.1
python-telegram-bot==20.0a2 python-telegram-bot==20.0a2
pytz>=2021.3