From 05021865c7e7af13c2356a2cf6276e5d04d5192d Mon Sep 17 00:00:00 2001 From: xtaodada Date: Wed, 13 Nov 2024 16:06:53 +0800 Subject: [PATCH] feat: support run job apscheduler --- pyproject.toml | 1 + src/core/scheduler.py | 13 +++++++ src/plugin/__init__.py | 1 + src/plugin/_job.py | 86 ++++++++++++++++++++++++++++++++++++++++++ src/plugin/plugin.py | 31 ++++++++++++++- uv.lock | 57 +++++++++++++++++++++++++++- 6 files changed, 187 insertions(+), 2 deletions(-) create mode 100644 src/core/scheduler.py create mode 100644 src/plugin/_job.py diff --git a/pyproject.toml b/pyproject.toml index 93bfb4a..0c1e353 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,7 @@ requires-python = ">=3.11" dependencies = [ "aiofiles>=24.1.0", "alembic>=1.13.3", + "apscheduler>=3.10.4", "asyncmy>=0.2.9", "black>=24.10.0", "fakeredis>=2.26.1", diff --git a/src/core/scheduler.py b/src/core/scheduler.py new file mode 100644 index 0000000..61628f6 --- /dev/null +++ b/src/core/scheduler.py @@ -0,0 +1,13 @@ +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from persica.factory.component import AsyncInitializingComponent + + +class TimeScheduler(AsyncInitializingComponent): + def __init__(self): + self.scheduler = AsyncIOScheduler(timezone="Asia/ShangHai") + + async def initialize(self): + self.scheduler.start() + + async def shutdown(self): + self.scheduler.shutdown() diff --git a/src/plugin/__init__.py b/src/plugin/__init__.py index 75b2916..89e6e4c 100644 --- a/src/plugin/__init__.py +++ b/src/plugin/__init__.py @@ -1 +1,2 @@ from ._handler import handler +from ._job import job diff --git a/src/plugin/_job.py b/src/plugin/_job.py new file mode 100644 index 0000000..90e89f8 --- /dev/null +++ b/src/plugin/_job.py @@ -0,0 +1,86 @@ +from dataclasses import dataclass +from typing import Any, ParamSpec, TypeVar, Callable + +P = ParamSpec("P") +R = TypeVar("R") + +_JOB_ATTR_NAME = "_job_data" + + +@dataclass(init=True) +class JobData: + trigger: str + name: str + kwargs: Any = None + + +class _Job: + def __init__( + self, + trigger: str, + name: str, + **kwargs, + ): + self.trigger = trigger + self.name = name + self.kwargs = kwargs + + def __call__(self, func: Callable[P, R]) -> Callable[P, R]: + data = JobData( + trigger=self.trigger, + name=self.name, + kwargs=self.kwargs, + ) + if hasattr(func, _JOB_ATTR_NAME): + job_datas = getattr(func, _JOB_ATTR_NAME) + job_datas.append(data) + setattr(func, _JOB_ATTR_NAME, job_datas) + else: + setattr(func, _JOB_ATTR_NAME, [data]) + return func + + +class _RunCron(_Job): + def __init__( + self, + name: str, + **kwargs, + ): + super().__init__( + trigger="cron", + name=name, + **kwargs, + ) + + +class _RunInterval(_Job): + def __init__( + self, + name: str, + **kwargs, + ): + super().__init__( + trigger="interval", + name=name, + **kwargs, + ) + + +class _RunDate(_Job): + def __init__( + self, + name: str, + **kwargs, + ): + super().__init__( + trigger="date", + name=name, + **kwargs, + ) + + +# noinspection PyPep8Naming +class job: + cron = _RunCron + interval = _RunInterval + date = _RunDate diff --git a/src/plugin/plugin.py b/src/plugin/plugin.py index a989aca..ea2891d 100644 --- a/src/plugin/plugin.py +++ b/src/plugin/plugin.py @@ -3,11 +3,14 @@ from typing import List, ClassVar, TYPE_CHECKING, Optional from types import MethodType from multiprocessing import RLock as Lock +from apscheduler.job import Job from persica.factory.component import AsyncInitializingComponent from src.app import app +from src.core.scheduler import TimeScheduler from src.core.web_app import WebApp from src.plugin._handler import HandlerData +from src.plugin._job import JobData, _JOB_ATTR_NAME from src.services.users.services import UserServices if TYPE_CHECKING: @@ -28,6 +31,7 @@ class Plugin(AsyncInitializingComponent): _prefix: Optional[str] = None _handlers: Optional[List[HandlerData]] = None + _jobs: Optional[List[Job]] = None @property def handlers(self) -> List[HandlerData]: @@ -48,6 +52,31 @@ class Plugin(AsyncInitializingComponent): self._handlers.append(data) return self._handlers + def _install_jobs(self) -> None: + if self._jobs is None: + self._jobs = [] + sche: TimeScheduler = app.factory.get_object(TimeScheduler) + for attr in dir(self): + # noinspection PyUnboundLocalVariable + if ( + not (attr.startswith("_") or attr in _EXCLUDE_ATTRS) + and isinstance(func := getattr(self, attr), MethodType) + and (datas := getattr(func, _JOB_ATTR_NAME, [])) + ): + for data in datas: + data: "JobData" + self._jobs.append( + sche.scheduler.add_job(func, data.trigger, **data.kwargs) + ) + + @property + def jobs(self) -> List[Job]: + with self._lock: + if self._jobs is None: + self._jobs = [] + self._install_jobs() + return self._jobs + async def initialize(self): await self.install() @@ -59,7 +88,7 @@ class Plugin(AsyncInitializingComponent): web: WebApp = app.factory.get_object(WebApp) auth: UserServices = app.factory.get_object(UserServices) - # self._install_jobs() + self._install_jobs() for h in self.handlers: dep = h.kwargs.get("dependencies", []) diff --git a/uv.lock b/uv.lock index 985cf25..95201e7 100644 --- a/uv.lock +++ b/uv.lock @@ -50,6 +50,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e4/f5/f2b75d2fc6f1a260f340f0e7c6a060f4dd2961cc16884ed851b0d18da06a/anyio-4.6.2.post1-py3-none-any.whl", hash = "sha256:6d170c36fba3bdd840c73d3868c1e777e33676a69c3a72cf0a0d5d6d8009b61d", size = 90377 }, ] +[[package]] +name = "apscheduler" +version = "3.10.4" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytz" }, + { name = "six" }, + { name = "tzlocal" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5e/34/5dcb368cf89f93132d9a31bd3747962a9dc874480e54333b0c09fa7d56ac/APScheduler-3.10.4.tar.gz", hash = "sha256:e6df071b27d9be898e486bc7940a7be50b4af2e9da7c08f0744a96d4bd4cef4a", size = 100832 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/13/b5/7af0cb920a476dccd612fbc9a21a3745fb29b1fcd74636078db8f7ba294c/APScheduler-3.10.4-py3-none-any.whl", hash = "sha256:fb91e8a768632a4756a585f79ec834e0e27aad5860bac7eaa523d9ccefd87661", size = 59303 }, +] + [[package]] name = "async-timeout" version = "5.0.0" @@ -520,6 +534,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b4/fb/275137a799169392f1fa88fff2be92f16eee38e982720a8aaadefc4a36b2/python_multipart-0.0.17-py3-none-any.whl", hash = "sha256:15dc4f487e0a9476cc1201261188ee0940165cffc94429b6fc565c4d3045cb5d", size = 24453 }, ] +[[package]] +name = "pytz" +version = "2024.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/3a/31/3c70bf7603cc2dca0f19bdc53b4537a797747a58875b552c8c413d963a3f/pytz-2024.2.tar.gz", hash = "sha256:2aa355083c50a0f93fa581709deac0c9ad65cca8a9e9beac660adcbd493c798a", size = 319692 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/11/c3/005fcca25ce078d2cc29fd559379817424e94885510568bc1bc53d7d5846/pytz-2024.2-py2.py3-none-any.whl", hash = "sha256:31c7c1817eb7fae7ca4b8c7ee50c72f93aa2dd863de768e1ef4245d426aa0725", size = 508002 }, +] + [[package]] name = "redis" version = "5.2.0" @@ -541,6 +564,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a0/e9/e58082fbb8cecbb6fb4133033c40cc50c248b1a331582be3a0f39138d65b/simpleeval-1.0.3-py3-none-any.whl", hash = "sha256:e3bdbb8c82c26297c9a153902d0fd1858a6c3774bf53ff4f134788c3f2035c38", size = 15762 }, ] +[[package]] +name = "six" +version = "1.16.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/71/39/171f1c67cd00715f190ba0b100d606d440a28c93c7714febeca8b79af85e/six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926", size = 34041 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d9/5a/e7c31adbe875f2abbb91bd84cf2dc52d792b5a01506781dbcf25c91daf11/six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254", size = 11053 }, +] + [[package]] name = "sniffio" version = "1.3.1" @@ -642,6 +674,27 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/26/9f/ad63fc0248c5379346306f8668cda6e2e2e9c95e01216d2b8ffd9ff037d0/typing_extensions-4.12.2-py3-none-any.whl", hash = "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d", size = 37438 }, ] +[[package]] +name = "tzdata" +version = "2024.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/e1/34/943888654477a574a86a98e9896bae89c7aa15078ec29f490fef2f1e5384/tzdata-2024.2.tar.gz", hash = "sha256:7d85cc416e9382e69095b7bdf4afd9e3880418a2413feec7069d533d6b4e31cc", size = 193282 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a6/ab/7e5f53c3b9d14972843a647d8d7a853969a58aecc7559cb3267302c94774/tzdata-2024.2-py2.py3-none-any.whl", hash = "sha256:a48093786cdcde33cad18c2555e8532f34422074448fbc874186f0abd79565cd", size = 346586 }, +] + +[[package]] +name = "tzlocal" +version = "5.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "tzdata", marker = "platform_system == 'Windows'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/04/d3/c19d65ae67636fe63953b20c2e4a8ced4497ea232c43ff8d01db16de8dc0/tzlocal-5.2.tar.gz", hash = "sha256:8d399205578f1a9342816409cc1e46a93ebd5755e39ea2d85334bea911bf0e6e", size = 30201 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/97/3f/c4c51c55ff8487f2e6d0e618dba917e3c3ee2caae6cf0fbb59c9b1876f2e/tzlocal-5.2-py3-none-any.whl", hash = "sha256:49816ef2fe65ea8ac19d19aa7a1ae0551c834303d5014c6d5a62e4cbda8047b8", size = 17859 }, +] + [[package]] name = "uvicorn" version = "0.32.0" @@ -662,6 +715,7 @@ source = { virtual = "." } dependencies = [ { name = "aiofiles" }, { name = "alembic" }, + { name = "apscheduler" }, { name = "asyncmy" }, { name = "black" }, { name = "fakeredis" }, @@ -679,8 +733,9 @@ dependencies = [ [package.metadata] requires-dist = [ - { name = "aiofiles" }, + { name = "aiofiles", specifier = ">=24.1.0" }, { name = "alembic", specifier = ">=1.13.3" }, + { name = "apscheduler" }, { name = "asyncmy", specifier = ">=0.2.9" }, { name = "black", specifier = ">=24.10.0" }, { name = "fakeredis", specifier = ">=2.26.1" },