PaiGram/utils/log/_logger.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

205 lines
6.5 KiB
Python
Raw Normal View History

import inspect
import io
import logging
import os
import traceback as traceback_
from multiprocessing import RLock as Lock
from pathlib import Path
2022-10-23 09:15:09 +00:00
from types import TracebackType
from typing import (
Any,
Callable,
List,
Mapping,
Optional,
TYPE_CHECKING,
Tuple,
Type,
Union,
)
2022-10-13 13:01:45 +00:00
from typing_extensions import Self
2022-10-23 09:15:09 +00:00
from utils.log._handler import (
FileHandler,
Handler,
)
from utils.typedefs import LogFilterType
if TYPE_CHECKING:
2022-10-23 09:15:09 +00:00
from utils.log._config import LoggerConfig # pylint: disable=unused-import
from logging import LogRecord # pylint: disable=unused-import
2022-10-23 09:15:09 +00:00
__all__ = ["Logger", "LogFilter"]
SysExcInfoType = Union[
Tuple[Type[BaseException], BaseException, Optional[TracebackType]],
Tuple[None, None, None],
]
ExceptionInfoType = Union[bool, SysExcInfoType, BaseException]
_lock = Lock()
2022-10-23 09:15:09 +00:00
NONE = object()
class Logger(logging.Logger):
2022-10-23 09:15:09 +00:00
_instance: Optional["Logger"] = None
def __new__(cls, *args, **kwargs) -> "Logger":
with _lock:
if cls._instance is None:
result = super(Logger, cls).__new__(cls)
cls._instance = result
return cls._instance
def __init__(self, config: "LoggerConfig" = None) -> None:
from utils.log._config import LoggerConfig
self.config = config or LoggerConfig()
level_ = 10 if self.config.debug else 20
super().__init__(
name=self.config.name,
level=level_ if self.config.level is None else self.config.level,
)
log_path = Path(self.config.project_root).joinpath(self.config.log_path)
handler, debug_handler, error_handler = (
# 控制台 log 配置
Handler(
width=self.config.width,
locals_max_length=self.config.traceback_locals_max_length,
locals_max_string=self.config.traceback_locals_max_string,
locals_max_depth=self.config.traceback_locals_max_depth,
project_root=self.config.project_root,
log_time_format=self.config.time_format,
),
# debug.log 配置
FileHandler(
width=self.config.width,
level=10,
path=log_path.joinpath("debug/debug.log"),
locals_max_depth=1,
locals_max_length=self.config.traceback_locals_max_length,
locals_max_string=self.config.traceback_locals_max_string,
project_root=self.config.project_root,
log_time_format=self.config.time_format,
),
# error.log 配置
FileHandler(
width=self.config.width,
level=40,
path=log_path.joinpath("error/error.log"),
locals_max_length=self.config.traceback_locals_max_length,
locals_max_string=self.config.traceback_locals_max_string,
locals_max_depth=self.config.traceback_locals_max_depth,
project_root=self.config.project_root,
log_time_format=self.config.time_format,
),
)
logging.basicConfig(
level=10 if self.config.debug else 20,
format="%(message)s",
datefmt=self.config.time_format,
handlers=[handler, debug_handler, error_handler],
)
if config.capture_warnings:
logging.captureWarnings(True)
warnings_logger = logging.getLogger("py.warnings")
warnings_logger.addHandler(handler)
warnings_logger.addHandler(debug_handler)
self.addHandler(handler)
self.addHandler(debug_handler)
self.addHandler(error_handler)
def success(
self,
msg: Any,
*args: Any,
exc_info: Optional[ExceptionInfoType] = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Optional[Mapping[str, Any]] = None,
) -> None:
2022-10-23 09:15:09 +00:00
return self.log(
25,
msg,
*args,
exc_info=exc_info,
stack_info=stack_info,
stacklevel=stacklevel,
extra=extra,
)
def exception(
self,
2022-10-23 09:15:09 +00:00
msg: Any = NONE,
2022-09-10 14:46:08 +00:00
*args: Any,
exc_info: Optional[ExceptionInfoType] = True,
stack_info: bool = False,
stacklevel: int = 1,
extra: Optional[Mapping[str, Any]] = None,
**kwargs,
2022-10-11 06:45:07 +00:00
) -> None: # pylint: disable=W1113
super(Logger, self).exception(
2022-10-23 09:15:09 +00:00
"" if msg is NONE else msg,
*args,
exc_info=exc_info,
stack_info=stack_info,
stacklevel=stacklevel,
extra=extra,
)
def findCaller(self, stack_info: bool = False, stacklevel: int = 1) -> Tuple[str, int, str, Optional[str]]:
frame = inspect.currentframe()
if frame is not None:
frame = frame.f_back
original_frame = frame
while frame and stacklevel > 1:
frame = frame.f_back
stacklevel -= 1
if not frame:
frame = original_frame
rv = "(unknown file)", 0, "(unknown function)", None
while hasattr(frame, "f_code"):
code = frame.f_code
filename = os.path.normcase(code.co_filename)
if filename in [
os.path.normcase(Path(__file__).resolve()),
os.path.normcase(logging.addLevelName.__code__.co_filename),
]:
frame = frame.f_back
continue
sinfo = None
if stack_info:
sio = io.StringIO()
sio.write("Stack (most recent call last):\n")
traceback_.print_stack(frame, file=sio)
sinfo = sio.getvalue()
if sinfo[-1] == "\n":
sinfo = sinfo[:-1]
sio.close()
rv = (code.co_filename, frame.f_lineno, code.co_name, sinfo)
break
return rv
2022-10-23 09:15:09 +00:00
def addFilter(self, log_filter: LogFilterType) -> None: # pylint: disable=arguments-differ
for handler in self.handlers:
handler.addFilter(log_filter)
2022-10-13 13:01:45 +00:00
class LogFilter(logging.Filter):
_filter_list: List[Callable[["LogRecord"], bool]] = []
def __init__(self, name: str = ""):
super().__init__(name=name)
def add_filter(self, f: Callable[["LogRecord"], bool]) -> Self:
if f not in self._filter_list:
self._filter_list.append(f)
return self
def filter(self, record: "LogRecord") -> bool:
return all(map(lambda func: func(record), self._filter_list))