PaiGram/core/builtins/executor.py

132 lines
4.5 KiB
Python
Raw Normal View History

"""执行器"""
import inspect
from functools import cached_property
from multiprocessing import RLock as Lock
from typing import Callable, ClassVar, Dict, Generic, Optional, TYPE_CHECKING, Type, TypeVar
from telegram import Update
from telegram.ext import CallbackContext
from typing_extensions import ParamSpec, Self
from core.builtins.contexts import handler_contexts, job_contexts
if TYPE_CHECKING:
from core.application import Application
from core.builtins.dispatcher import AbstractDispatcher, HandlerDispatcher
from multiprocessing.synchronize import RLock as LockType
__all__ = ("BaseExecutor", "Executor", "HandlerExecutor", "JobExecutor")
T = TypeVar("T")
R = TypeVar("R")
P = ParamSpec("P")
class BaseExecutor:
"""执行器
Args:
name(str): 该执行器的名称执行器的名称是唯一的
只支持执行只拥有 POSITIONAL_OR_KEYWORD KEYWORD_ONLY 两种参数类型的函数
"""
_lock: ClassVar["LockType"] = Lock()
_instances: ClassVar[Dict[str, Self]] = {}
_application: "Optional[Application]" = None
def set_application(self, application: "Application") -> None:
self._application = application
@property
def application(self) -> "Application":
if self._application is None:
raise RuntimeError(f"No application was set for this {self.__class__.__name__}.")
return self._application
def __new__(cls: Type[T], name: str, *args, **kwargs) -> T:
with cls._lock:
if (instance := cls._instances.get(name)) is None:
instance = object.__new__(cls)
instance.__init__(name, *args, **kwargs)
cls._instances.update({name: instance})
return instance
@cached_property
def name(self) -> str:
"""当前执行器的名称"""
return self._name
def __init__(self, name: str, dispatcher: Optional[Type["AbstractDispatcher"]] = None) -> None:
self._name = name
self._dispatcher = dispatcher
class Executor(BaseExecutor, Generic[P, R]):
async def __call__(
self,
target: Callable[P, R],
dispatcher: Optional[Type["AbstractDispatcher"]] = None,
**kwargs,
) -> R:
dispatcher = self._dispatcher or dispatcher
dispatcher_instance = dispatcher(**kwargs)
dispatcher_instance.set_application(application=self.application)
dispatched_func = dispatcher_instance.dispatch(target) # 分发参数,组成新函数
# 执行
if inspect.iscoroutinefunction(target):
result = await dispatched_func()
else:
result = dispatched_func()
return result
class HandlerExecutor(BaseExecutor, Generic[P, R]):
"""Handler专用执行器"""
_callback: Callable[P, R]
_dispatcher: "HandlerDispatcher"
def __init__(self, func: Callable[P, R], dispatcher: Optional[Type["HandlerDispatcher"]] = None) -> None:
if dispatcher is None:
from core.builtins.dispatcher import HandlerDispatcher
dispatcher = HandlerDispatcher
super().__init__("handler", dispatcher)
self._callback = func
self._dispatcher = dispatcher()
def set_application(self, application: "Application") -> None:
self._application = application
if self._dispatcher is not None:
self._dispatcher.set_application(application)
async def __call__(self, update: Update, context: CallbackContext) -> R:
with handler_contexts(update, context):
dispatched_func = self._dispatcher.dispatch(self._callback, update=update, context=context)
return await dispatched_func()
class JobExecutor(BaseExecutor):
"""Job 专用执行器"""
def __init__(self, func: Callable[P, R], dispatcher: Optional[Type["AbstractDispatcher"]] = None) -> None:
if dispatcher is None:
from core.builtins.dispatcher import JobDispatcher
dispatcher = JobDispatcher
super().__init__("job", dispatcher)
self._callback = func
self._dispatcher = dispatcher()
def set_application(self, application: "Application") -> None:
self._application = application
if self._dispatcher is not None:
self._dispatcher.set_application(application)
async def __call__(self, context: CallbackContext) -> R:
with job_contexts(context):
dispatched_func = self._dispatcher.dispatch(self._callback, context=context)
return await dispatched_func()