♻ 重写 job_queue 注册

This commit is contained in:
洛水居室 2022-08-04 20:58:12 +08:00
parent f735bd6fd7
commit 8caccc6d79
No known key found for this signature in database
GPG Key ID: C9DE87DA724B88FC
3 changed files with 4 additions and 200 deletions

View File

@ -11,7 +11,6 @@ import datetime
from telegram.ext import CallbackContext from telegram.ext import CallbackContext
from jobs.base import RunDailyHandler
from logger import Log from logger import Log
from utils.job.manager import listener_jobs_class from utils.job.manager import listener_jobs_class
@ -19,13 +18,11 @@ from utils.job.manager import listener_jobs_class
class JobTest: class JobTest:
@classmethod @classmethod
def build_jobs(cls) -> list: def build_jobs(cls, job_queue: JobQueue):
test = cls() test = cls()
# 注册每日执行任务 # 注册每日执行任务
# 执行时间为21点45分 # 执行时间为21点45分
return [ job_queue.run_daily(test.test, datetime.time(21, 45, 00), name="测试Job")
RunDailyHandler(test.test, datetime.time(21, 45, 00), name="测试Job")
]
async def test(self, context: CallbackContext): async def test(self, context: CallbackContext):
Log.info("测试Job[OK]") Log.info("测试Job[OK]")

View File

@ -1,200 +1,11 @@
import datetime
from typing import Union, Tuple
from telegram.ext import CallbackContext from telegram.ext import CallbackContext
from model.types import JSONDict, Func
class BaseJobHandler:
pass
class RunDailyHandler:
def __init__(self, callback: Func, time: datetime.time, days: Tuple[int, ...] = tuple(range(7)),
data: object = None, name: str = None, chat_id: int = None, user_id: int = None,
job_kwargs: JSONDict = None, ):
"""Creates a new :class:`Job` that runs on a daily basis and adds it to the queue.
Note:
For a note about DST, please see the documentation of `APScheduler`_.
.. _`APScheduler`: https://apscheduler.readthedocs.io/en/stable/modules/triggers/cron.html
#daylight-saving-time-behavior
Args:
callback (:term:`coroutine function`): The callback function that should be executed by
the new job. Callback signature::
async def callback(context: CallbackContext)
time (:obj:`datetime.time`): Time of day at which the job should run. If the timezone
(:obj:`datetime.time.tzinfo`) is :obj:`None`, the default timezone of the bot will
be used, which is UTC unless :attr:`telegram.ext.Defaults.tzinfo` is used.
days (Tuple[:obj:`int`], optional): Defines on which days of the week the job should
run (where ``0-6`` correspond to sunday - saturday). By default, the job will run
every day.
.. versionchanged:: 20.0
Changed day of the week mapping of 0-6 from monday-sunday to sunday-saturday.
data (:obj:`object`, optional): Additional data needed for the callback function.
Can be accessed through :attr:`Job.data` in the callback. Defaults to
:obj:`None`.
.. versionchanged:: 20.0
Renamed the parameter ``context`` to :paramref:`data`.
name (:obj:`str`, optional): The name of the new job. Defaults to
:external:attr:`callback.__name__ <definition.__name__>`.
chat_id (:obj:`int`, optional): Chat id of the chat associated with this job. If
passed, the corresponding :attr:`~telegram.ext.CallbackContext.chat_data` will
be available in the callback.
.. versionadded:: 20.0
user_id (:obj:`int`, optional): User id of the user associated with this job. If
passed, the corresponding :attr:`~telegram.ext.CallbackContext.user_data` will
be available in the callback.
.. versionadded:: 20.0
job_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to pass to the
:meth:`apscheduler.schedulers.base.BaseScheduler.add_job()`.
"""
# 复制文档
self.job_kwargs = job_kwargs
self.user_id = user_id
self.chat_id = chat_id
self.name = name
self.data = data
self.days = days
self.time = time
self.callback = callback
@property
def get_kwargs(self) -> dict:
kwargs = {
"callback": self.callback,
"time": self.time,
"days": self.days,
"data": self.data,
"name": self.name,
"chat_id": self.chat_id,
"user_id": self.callback,
"job_kwargs": self.job_kwargs,
}
return kwargs
class RunRepeatingHandler:
def __init__(self, callback: Func, interval: Union[float, datetime.timedelta],
first: Union[float, datetime.timedelta, datetime.datetime, datetime.time] = None,
last: Union[float, datetime.timedelta, datetime.datetime, datetime.time] = None,
context: object = None, name: str = None, chat_id: int = None, user_id: int = None,
job_kwargs: JSONDict = None):
"""Creates a new :class:`Job` instance that runs at specified intervals and adds it to the
queue.
Note:
For a note about DST, please see the documentation of `APScheduler`_.
.. _`APScheduler`: https://apscheduler.readthedocs.io/en/stable/modules/triggers/cron.html
#daylight-saving-time-behavior
Args:
callback (:term:`coroutine function`): The callback function that should be executed by
the new job. Callback signature::
async def callback(context: CallbackContext)
interval (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta`): The interval in which
the job will run. If it is an :obj:`int` or a :obj:`float`, it will be interpreted
as seconds.
first (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta` | \
:obj:`datetime.datetime` | :obj:`datetime.time`, optional):
Time in or at which the job should run. This parameter will be interpreted
depending on its type.
* :obj:`int` or :obj:`float` will be interpreted as "seconds from now" in which the
job should run.
* :obj:`datetime.timedelta` will be interpreted as "time from now" in which the
job should run.
* :obj:`datetime.datetime` will be interpreted as a specific date and time at
which the job should run. If the timezone (:attr:`datetime.datetime.tzinfo`) is
:obj:`None`, the default timezone of the bot will be used.
* :obj:`datetime.time` will be interpreted as a specific time of day at which the
job should run. This could be either today or, if the time has already passed,
tomorrow. If the timezone (:attr:`datetime.time.tzinfo`) is :obj:`None`, the
default timezone of the bot will be used, which is UTC unless
:attr:`telegram.ext.Defaults.tzinfo` is used.
Defaults to :paramref:`interval`
last (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta` | \
:obj:`datetime.datetime` | :obj:`datetime.time`, optional):
Latest possible time for the job to run. This parameter will be interpreted
depending on its type. See :paramref:`first` for details.
If :paramref:`last` is :obj:`datetime.datetime` or :obj:`datetime.time` type
and ``last.tzinfo`` is :obj:`None`, the default timezone of the bot will be
assumed, which is UTC unless :attr:`telegram.ext.Defaults.tzinfo` is used.
Defaults to :obj:`None`.
data (:obj:`object`, optional): Additional data needed for the callback function.
Can be accessed through :attr:`Job.data` in the callback. Defaults to
:obj:`None`.
.. versionchanged:: 20.0
Renamed the parameter ``context`` to :paramref:`data`.
name (:obj:`str`, optional): The name of the new job. Defaults to
:external:attr:`callback.__name__ <definition.__name__>`.
chat_id (:obj:`int`, optional): Chat id of the chat associated with this job. If
passed, the corresponding :attr:`~telegram.ext.CallbackContext.chat_data` will
be available in the callback.
.. versionadded:: 20.0
user_id (:obj:`int`, optional): User id of the user associated with this job. If
passed, the corresponding :attr:`~telegram.ext.CallbackContext.user_data` will
be available in the callback.
.. versionadded:: 20.0
job_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to pass to the
:meth:`apscheduler.schedulers.base.BaseScheduler.add_job()`.
"""
# 复制文档
self.callback = callback
self.interval = interval
self.first = first
self.last = last
self.context = context
self.name = name
self.chat_id = chat_id
self.user_id = user_id
self.job_kwargs = job_kwargs
@property
def get_kwargs(self) -> dict:
kwargs = {
"callback": self.callback,
"interval": self.interval,
"first": self.first,
"last": self.last,
"context": self.context,
"name": self.name,
"chat_id": self.chat_id,
"user_id": self.callback,
"job_kwargs": self.job_kwargs,
}
return kwargs
class BaseJob: class BaseJob:
@staticmethod @staticmethod
def remove_job_if_exists(name: str, context: CallbackContext) -> bool: def remove_job_if_exists(name: str, context: CallbackContext) -> bool:
current_jobs = context.job_queue.get_jobs_by_name(name) current_jobs = context.job_queue.get_jobs_by_name(name)
context.job_queue.run_repeating()
if not current_jobs: if not current_jobs:
return False return False
for job in current_jobs: for job in current_jobs:

View File

@ -6,7 +6,6 @@ from typing import List, Union
from telegram.ext import Application from telegram.ext import Application
from jobs.base import RunDailyHandler
from logger import Log from logger import Log
JobsClass: List[object] = [] JobsClass: List[object] = []
@ -67,11 +66,8 @@ class JobsManager:
for func in JobsClass: for func in JobsClass:
if callable(func): if callable(func):
try: try:
handlers_list = func.build_jobs() func.build_jobs(application.job_queue)
for handler in handlers_list: # Log.info(f"添加每日Job成功 Job名称[{handler.name}] Job每日执行时间[{handler.time.isoformat()}]")
if isinstance(handler, RunDailyHandler):
application.job_queue.run_daily(**handler.get_kwargs)
Log.info(f"添加每日Job成功 Job名称[{handler.name}] Job每日执行时间[{handler.time.isoformat()}]")
except AttributeError as exc: except AttributeError as exc:
if "build_jobs" in str(exc): if "build_jobs" in str(exc):
Log.error("build_jobs 函数未找到", exc) Log.error("build_jobs 函数未找到", exc)