diff --git a/.env.example b/.env.example index f9cf556..0bfdde1 100644 --- a/.env.example +++ b/.env.example @@ -29,11 +29,19 @@ ADMINS=[{ "username": "", "user_id": -1 }] # VERIFY_GROUPS=[] # logger 配置 可选配置项 +# 打印时的宽度 LOGGER_WIDTH=180 +# log 文件存放目录 LOGGER_LOG_PATH="logs" +# log 时间格式,参考 datetime.strftime LOGGER_TIME_FORMAT="[%Y-%m-%d %X]" -LOGGER_TRACEBACK_MAX_FRAMES=20 +# log 高亮关键词 LOGGER_RENDER_KEYWORDS=["BOT"] +# traceback 相关配置 +LOGGER_TRACEBACK_MAX_FRAMES=20 +LOGGER_LOCALS_MAX_DEPTH=0 +LOCALS_LOCALS_MAX_LENGTH=10 +LOCALS_LOCALS_MAX_STRING=80 # mtp 客户端 可选配置项 # API_ID=12345 diff --git a/core/base/webserver.py b/core/base/webserver.py index 28a3840..7cd5070 100644 --- a/core/base/webserver.py +++ b/core/base/webserver.py @@ -1,4 +1,5 @@ import asyncio + import uvicorn from fastapi import FastAPI @@ -35,12 +36,7 @@ class WebServer(Service): self.port = port self.server = uvicorn.Server( - uvicorn.Config( - app=webapp, - port=port, - use_colors=False, - host=host, - ) + uvicorn.Config(app=webapp, port=port, use_colors=False, host=host, log_config=None) ) async def start(self): diff --git a/core/config.py b/core/config.py index 64d25f6..9851e42 100644 --- a/core/config.py +++ b/core/config.py @@ -7,15 +7,12 @@ from typing import ( import dotenv import ujson as json -from pydantic import ( - BaseModel, - BaseSettings, -) - -__all__ = ["BotConfig", "config"] +from pydantic import BaseModel, BaseSettings, validator from utils.const import PROJECT_ROOT +__all__ = ["BotConfig", "config"] + dotenv.load_dotenv() @@ -47,6 +44,9 @@ class BotConfig(BaseSettings): logger_time_format: str = "[%Y-%m-%d %X]" logger_traceback_max_frames: int = 20 logger_render_keywords: List[str] = ["BOT"] + logger_locals_max_depth: Optional[int] = 0 + logger_locals_max_length: int = 10 + logger_locals_max_string: int = 80 enka_network_api_agent: str = "" pass_challenge_api: str = "" @@ -87,6 +87,9 @@ class BotConfig(BaseSettings): path=PROJECT_ROOT.joinpath(self.logger_log_path).resolve(), time_format=self.logger_time_format, render_keywords=self.logger_render_keywords, + locals_max_length=self.logger_locals_max_length, + locals_max_string=self.logger_locals_max_string, + locals_max_depth=self.logger_locals_max_depth, ) @property @@ -134,6 +137,15 @@ class LoggerConfig(BaseModel): traceback_max_frames: int = 20 path: Path = PROJECT_ROOT / "logs" render_keywords: List[str] = ["BOT"] + locals_max_length: int = 10 + locals_max_string: int = 80 + locals_max_depth: Optional[int] = None + + @validator("locals_max_depth", pre=True, check_fields=False) + def locals_max_depth_validator(cls, value) -> Optional[int]: # pylint: disable=R0201 + if value <= 0: + return None + return value class MTProtoConfig(BaseModel): diff --git a/utils/log/_handler.py b/utils/log/_handler.py new file mode 100644 index 0000000..d774a28 --- /dev/null +++ b/utils/log/_handler.py @@ -0,0 +1,270 @@ +import logging +import os +import sys +from datetime import datetime +from pathlib import Path +from typing import Any, Callable, Iterable, List, Literal, Optional, TYPE_CHECKING, Union + +import ujson as json +from rich.console import ( + Console, +) +from rich.logging import ( + LogRender as DefaultLogRender, + RichHandler as DefaultRichHandler, +) +from rich.table import Table +from rich.text import ( + Text, + TextType, +) +from rich.theme import Theme +from ujson import JSONDecodeError + +from core.config import config +from utils.const import PROJECT_ROOT +from utils.log._file import FileIO +from utils.log._style import DEFAULT_STYLE +from utils.log._traceback import Traceback + +if TYPE_CHECKING: + from rich.console import ( # pylint: disable=unused-import + ConsoleRenderable, + RenderableType, + ) + from logging import LogRecord # pylint: disable=unused-import + +__all__ = ["LogRender", "Handler", "FileHandler"] + +FormatTimeCallable = Callable[[datetime], Text] + +logging.addLevelName(5, "TRACE") +logging.addLevelName(25, "SUCCESS") +color_system: Literal["windows", "truecolor"] +if sys.platform == "win32": + color_system = "windows" +else: + color_system = "truecolor" +# noinspection SpellCheckingInspection +log_console = Console(color_system=color_system, theme=Theme(DEFAULT_STYLE), width=config.logger.width) + + +class LogRender(DefaultLogRender): + @property + def last_time(self): + return self._last_time + + @last_time.setter + def last_time(self, last_time): + self._last_time = last_time + + def __init__(self, *args, **kwargs): + super(LogRender, self).__init__(*args, **kwargs) + self.show_level = True + self.time_format = config.logger.time_format + + def __call__( + self, + console: "Console", + renderables: Iterable["ConsoleRenderable"], + log_time: Optional[datetime] = None, + time_format: Optional[Union[str, FormatTimeCallable]] = None, + level: TextType = "", + path: Optional[str] = None, + line_no: Optional[int] = None, + link_path: Optional[str] = None, + ) -> Table: + from rich.containers import Renderables + + output = Table.grid(padding=(0, 1)) + output.expand = True + output.add_column(style="log.time") + output.add_column(style="log.level", width=self.level_width) + output.add_column(ratio=1, style="log.message", overflow="fold") + if path: + output.add_column(style="log.path") + if line_no: + output.add_column(style="log.line_no", width=4) + row: List["RenderableType"] = [] + if self.show_time: + log_time = log_time or log_console.get_datetime() + time_format = time_format or self.time_format + if callable(time_format): + log_time_display = time_format(log_time) + else: + log_time_display = Text(log_time.strftime(time_format)) + if log_time_display == self.last_time and self.omit_repeated_times: + row.append(Text(" " * len(log_time_display))) + else: + row.append(log_time_display) + self.last_time = log_time_display + if self.show_level: + row.append(level) + + row.append(Renderables(renderables)) + if path: + path_text = Text() + path_text.append(path, style=f"link file://{link_path}" if link_path else "") + row.append(path_text) + + line_no_text = Text() + line_no_text.append(str(line_no), style=f"link file://{link_path}#{line_no}" if link_path else "") + row.append(line_no_text) + + output.add_row(*row) + return output + + +class Handler(DefaultRichHandler): + def __init__( + self, + *args, + rich_tracebacks: bool = True, + locals_max_depth: Optional[int] = config.logger.locals_max_depth, + **kwargs, + ) -> None: + super(Handler, self).__init__(*args, rich_tracebacks=rich_tracebacks, **kwargs) + self._log_render = LogRender() + self.console = log_console + self.tracebacks_show_locals = True + self.keywords = self.KEYWORDS + config.logger.render_keywords + self.locals_max_depth = locals_max_depth + + def render( + self, + *, + record: "LogRecord", + traceback: Optional[Traceback], + message_renderable: Optional["ConsoleRenderable"], + ) -> "ConsoleRenderable": + if record.pathname != "": + try: + path = str(Path(record.pathname).relative_to(PROJECT_ROOT)) + path = path.split(".")[0].replace(os.sep, ".") + except ValueError: + import site + + path = None + for s in site.getsitepackages(): + try: + path = str(Path(record.pathname).relative_to(Path(s))) + break + except ValueError: + continue + if path is None: + path = "" + else: + path = path.split(".")[0].replace(os.sep, ".") + else: + path = "" + path = path.replace("lib.site-packages.", "") + _level = self.get_level_text(record) + time_format = None if self.formatter is None else self.formatter.datefmt + log_time = datetime.fromtimestamp(record.created) + + log_renderable = self._log_render( + self.console, + ( + [message_renderable] + if not traceback + else ([message_renderable, traceback] if message_renderable is not None else [traceback]) + ), + log_time=log_time, + time_format=time_format, + level=_level, + path=path, + link_path=record.pathname if self.enable_link_path else None, + line_no=record.lineno, + ) + return log_renderable + + def render_message(self, record: "LogRecord", message: Any) -> "ConsoleRenderable": + use_markup = getattr(record, "markup", self.markup) + if isinstance(message, str): + message_text = Text.from_markup(message) if use_markup else Text(message) + highlighter = getattr(record, "highlighter", self.highlighter) + else: + from rich.highlighter import JSONHighlighter + from rich.json import JSON + + highlighter = JSONHighlighter() + message_text = JSON.from_data(message, indent=4).text + + if highlighter is not None: + # noinspection PyCallingNonCallable + message_text = highlighter(message_text) + + if self.keywords is None: + self.keywords = self.KEYWORDS + + if self.keywords: + message_text.highlight_words(self.keywords, "logging.keyword") + + return message_text + + def emit(self, record: "LogRecord") -> None: + message = self.format(record) + _traceback = None + if self.rich_tracebacks and record.exc_info and record.exc_info != (None, None, None): + exc_type, exc_value, exc_traceback = record.exc_info + if exc_type is None or exc_value is None: + raise ValueError(record) + _traceback = Traceback.from_exception( + exc_type, + exc_value, + exc_traceback, + width=self.tracebacks_width, + extra_lines=self.tracebacks_extra_lines, + word_wrap=self.tracebacks_word_wrap, + show_locals=getattr(record, "show_locals", None) or self.tracebacks_show_locals, + locals_max_length=getattr(record, "locals_max_length", None) or self.locals_max_length, + locals_max_string=getattr(record, "locals_max_string", None) or self.locals_max_string, + locals_max_depth=( + getattr(record, "locals_max_depth") + if hasattr(record, "locals_max_depth") + else self.locals_max_depth + ), + suppress=self.tracebacks_suppress, + ) + message = record.getMessage() + if self.formatter: + record.message = record.getMessage() + formatter = self.formatter + if hasattr(formatter, "usesTime") and formatter.usesTime(): + record.asctime = formatter.formatTime(record, formatter.datefmt) + message = formatter.formatMessage(record) + if message == str(exc_value): + message = None + + if message is not None: + try: + message_renderable = self.render_message(record, json.loads(message)) + except JSONDecodeError: + message_renderable = self.render_message(record, message) + else: + message_renderable = None + log_renderable = self.render(record=record, traceback=_traceback, message_renderable=message_renderable) + # noinspection PyBroadException + try: + self.console.print(log_renderable) + except Exception: # pylint: disable=W0703 + self.handleError(record) + + +class FileHandler(Handler): + def __init__(self, *args, path: Path, **kwargs): + super().__init__(*args, **kwargs) + while True: + try: + path.parent.mkdir(exist_ok=True) + break + except FileNotFoundError: + parent = path.parent + while True: + try: + parent.mkdir(exist_ok=True) + break + except FileNotFoundError: + parent = parent.parent + path.parent.mkdir(exist_ok=True) + self.console = Console(width=180, file=FileIO(path), theme=Theme(DEFAULT_STYLE)) diff --git a/utils/log/_logger.py b/utils/log/_logger.py index 2781b66..ac7c44d 100644 --- a/utils/log/_logger.py +++ b/utils/log/_logger.py @@ -2,59 +2,19 @@ import inspect import io import logging import os -import sys import traceback as traceback_ -from datetime import datetime from multiprocessing import RLock as Lock from pathlib import Path -from typing import Any, Callable, Dict, Iterable, List, Literal, Mapping, Optional, TYPE_CHECKING, Tuple, Union +from typing import Any, Callable, List, Mapping, Optional, TYPE_CHECKING, Tuple -import ujson as json -from rich.columns import Columns -from rich.console import ( - Console, - RenderResult, - group, -) -from rich.logging import ( - LogRender as DefaultLogRender, - RichHandler as DefaultRichHandler, -) -from rich.syntax import ( - PygmentsSyntaxTheme, - Syntax, -) -from rich.text import ( - Text, - TextType, -) -from rich.theme import Theme -from rich.traceback import ( - Stack, - Traceback as BaseTraceback, -) -from ujson import JSONDecodeError +from typing_extensions import Self from core.config import config -from utils.const import NOT_SET, PROJECT_ROOT -from utils.log._file import FileIO -from utils.log._style import ( - DEFAULT_STYLE, - MonokaiProStyle, -) +from utils.const import NOT_SET +from utils.log._handler import FileHandler, Handler from utils.typedefs import ExceptionInfoType if TYPE_CHECKING: - from rich.table import Table # pylint: disable=unused-import - from rich.console import ( # pylint: disable=unused-import - ConsoleRenderable, - RenderableType, - ) - from rich.console import ( # pylint: disable=unused-import - ConsoleRenderable, - RenderableType, - ) - from rich.table import Table # pylint: disable=unused-import from logging import LogRecord # pylint: disable=unused-import __all__ = ["logger"] @@ -62,359 +22,6 @@ __all__ = ["logger"] _lock = Lock() __initialized__ = False -FormatTimeCallable = Callable[[datetime], Text] - -logging.addLevelName(5, "TRACE") -logging.addLevelName(25, "SUCCESS") -color_system: Literal["windows", "truecolor"] -if sys.platform == "win32": - color_system = "windows" -else: - color_system = "truecolor" -# noinspection SpellCheckingInspection -log_console = Console(color_system=color_system, theme=Theme(DEFAULT_STYLE), width=config.logger.width) - - -class Traceback(BaseTraceback): - def __init__(self, *args, **kwargs): - kwargs.update( - { - "show_locals": True, - "max_frames": config.logger.traceback_max_frames, - "locals_max_length": 3, - "locals_max_string": 80, - } - ) - super(Traceback, self).__init__(*args, **kwargs) - self.theme = PygmentsSyntaxTheme(MonokaiProStyle) - - @group() - def _render_stack(self, stack: Stack) -> RenderResult: - from rich.traceback import ( - PathHighlighter, - Frame, - ) - - path_highlighter = PathHighlighter() - theme = self.theme - code_cache: Dict[str, str] = {} - - # noinspection PyShadowingNames - def read_code(filename: str) -> str: - code = code_cache.get(filename) - if code is None: - with open(filename, "rt", encoding="utf-8", errors="replace") as code_file: - code = code_file.read() - code_cache[filename] = code - return code - - # noinspection PyShadowingNames - def render_locals(frame: Frame) -> Iterable["ConsoleRenderable"]: - if frame.locals: - from rich.scope import render_scope - - yield render_scope( - frame.locals, - title="locals", - indent_guides=self.indent_guides, - max_length=self.locals_max_length, - max_string=self.locals_max_string, - ) - - exclude_frames: Optional[range] = None - if self.max_frames != 0: - exclude_frames = range( - self.max_frames // 2, - len(stack.frames) - self.max_frames // 2, - ) - - excluded = False - for frame_index, frame in enumerate(stack.frames): - - if exclude_frames and frame_index in exclude_frames: - excluded = True - continue - - if excluded: - if exclude_frames is None: - raise ValueError(exclude_frames) - yield Text( - f"\n... {len(exclude_frames)} frames hidden ...", - justify="center", - style="traceback.error", - ) - excluded = False - - first = frame_index == 0 - frame_filename = frame.filename - suppressed = any(frame_filename.startswith(path) for path in self.suppress) - - text = Text.assemble( - path_highlighter(Text(frame.filename, style="pygments.string")), - (":", "pygments.text"), - (str(frame.lineno), "pygments.number"), - " in ", - (frame.name, "pygments.function"), - style="pygments.text", - ) - if not frame.filename.startswith("<") and not first: - yield "" - yield text - if frame.filename.startswith("<"): - yield from render_locals(frame) - continue - if not suppressed: - try: - if self.width is not None: - code_width = self.width - 5 - else: - code_width = 100 - code = read_code(frame.filename) - lexer_name = self._guess_lexer(frame.filename, code) - syntax = Syntax( - code, - lexer_name, - theme=theme, - line_numbers=True, - line_range=( - frame.lineno - self.extra_lines, - frame.lineno + self.extra_lines, - ), - highlight_lines={frame.lineno}, - word_wrap=self.word_wrap, - code_width=code_width, - indent_guides=self.indent_guides, - dedent=False, - ) - yield "" - except Exception as error: # pylint: disable=W0703 - yield Text.assemble( - (f"\n{error}", "traceback.error"), - ) - else: - yield ( - Columns( - [ - syntax, - *render_locals(frame), - ], - padding=1, - ) - if frame.locals - else syntax - ) - - -class LogRender(DefaultLogRender): - @property - def last_time(self): - return self._last_time - - @last_time.setter - def last_time(self, last_time): - self._last_time = last_time - - def __init__(self, *args, **kwargs): - super(LogRender, self).__init__(*args, **kwargs) - self.show_level = True - self.time_format = config.logger.time_format - - def __call__( - self, - console: "Console", - renderables: Iterable["ConsoleRenderable"], - log_time: Optional[datetime] = None, - time_format: Optional[Union[str, FormatTimeCallable]] = None, - level: TextType = "", - path: Optional[str] = None, - line_no: Optional[int] = None, - link_path: Optional[str] = None, - ) -> "Table": - from rich.containers import Renderables - from rich.table import Table - - output = Table.grid(padding=(0, 1)) - output.expand = True - output.add_column(style="log.time") - output.add_column(style="log.level", width=self.level_width) - output.add_column(ratio=1, style="log.message", overflow="fold") - if path: - output.add_column(style="log.path") - if line_no: - output.add_column(style="log.line_no", width=4) - row: List["RenderableType"] = [] - if self.show_time: - log_time = log_time or log_console.get_datetime() - time_format = time_format or self.time_format - if callable(time_format): - log_time_display = time_format(log_time) - else: - log_time_display = Text(log_time.strftime(time_format)) - if log_time_display == self.last_time and self.omit_repeated_times: - row.append(Text(" " * len(log_time_display))) - else: - row.append(log_time_display) - self.last_time = log_time_display - if self.show_level: - row.append(level) - - row.append(Renderables(renderables)) - if path: - path_text = Text() - path_text.append(path, style=f"link file://{link_path}" if link_path else "") - row.append(path_text) - - line_no_text = Text() - line_no_text.append(str(line_no), style=f"link file://{link_path}#{line_no}" if link_path else "") - row.append(line_no_text) - - output.add_row(*row) - return output - - -class Handler(DefaultRichHandler): - def __init__(self, *args, **kwargs): - super(Handler, self).__init__(*args, **kwargs) - self._log_render = LogRender() - self.console = log_console - self.rich_tracebacks = True - self.tracebacks_show_locals = True - self.keywords = self.KEYWORDS + config.logger.render_keywords - - def render( - self, - *, - record: "LogRecord", - traceback: Optional[Traceback], - message_renderable: Optional["ConsoleRenderable"], - ) -> "ConsoleRenderable": - if record.pathname != "": - try: - path = str(Path(record.pathname).relative_to(PROJECT_ROOT)) - path = path.split(".")[0].replace(os.sep, ".") - except ValueError: - import site - - path = None - for s in site.getsitepackages(): - try: - path = str(Path(record.pathname).relative_to(Path(s))) - break - except ValueError: - continue - if path is None: - path = "" - else: - path = path.split(".")[0].replace(os.sep, ".") - else: - path = "" - path = path.replace("lib.site-packages.", "") - _level = self.get_level_text(record) - time_format = None if self.formatter is None else self.formatter.datefmt - log_time = datetime.fromtimestamp(record.created) - - log_renderable = self._log_render( - self.console, - ( - [message_renderable] - if not traceback - else ([message_renderable, traceback] if message_renderable is not None else [traceback]) - ), - log_time=log_time, - time_format=time_format, - level=_level, - path=path, - link_path=record.pathname if self.enable_link_path else None, - line_no=record.lineno, - ) - return log_renderable - - def render_message(self, record: "LogRecord", message: Any) -> "ConsoleRenderable": - use_markup = getattr(record, "markup", self.markup) - if isinstance(message, str): - message_text = Text.from_markup(message) if use_markup else Text(message) - highlighter = getattr(record, "highlighter", self.highlighter) - else: - from rich.highlighter import JSONHighlighter - from rich.json import JSON - - highlighter = JSONHighlighter() - message_text = JSON.from_data(message, indent=4).text - - if highlighter is not None: - # noinspection PyCallingNonCallable - message_text = highlighter(message_text) - - if self.keywords is None: - self.keywords = self.KEYWORDS - - if self.keywords: - message_text.highlight_words(self.keywords, "logging.keyword") - - return message_text - - def emit(self, record: "LogRecord") -> None: - message = self.format(record) - _traceback = None - if self.rich_tracebacks and record.exc_info and record.exc_info != (None, None, None): - exc_type, exc_value, exc_traceback = record.exc_info - if exc_type is None or exc_value is None: - raise ValueError(record) - _traceback = Traceback.from_exception( - exc_type, - exc_value, - exc_traceback, - width=self.tracebacks_width, - extra_lines=self.tracebacks_extra_lines, - word_wrap=self.tracebacks_word_wrap, - show_locals=self.tracebacks_show_locals, - locals_max_length=self.locals_max_length, - locals_max_string=self.locals_max_string, - suppress=self.tracebacks_suppress, - ) - message = record.getMessage() - if self.formatter: - record.message = record.getMessage() - formatter = self.formatter - if hasattr(formatter, "usesTime") and formatter.usesTime(): - record.asctime = formatter.formatTime(record, formatter.datefmt) - message = formatter.formatMessage(record) - if message == str(exc_value): - message = None - - if message is not None: - try: - message_renderable = self.render_message(record, json.loads(message)) - except JSONDecodeError: - message_renderable = self.render_message(record, message) - else: - message_renderable = None - log_renderable = self.render(record=record, traceback=_traceback, message_renderable=message_renderable) - # noinspection PyBroadException - try: - self.console.print(log_renderable) - except Exception: # pylint: disable=W0703 - self.handleError(record) - - -class FileHandler(Handler): - def __init__(self, *args, path: Path, **kwargs): - super().__init__(*args, **kwargs) - while True: - try: - path.parent.mkdir(exist_ok=True) - break - except FileNotFoundError: - parent = path.parent - while True: - try: - parent.mkdir(exist_ok=True) - break - except FileNotFoundError: - parent = parent.parent - path.parent.mkdir(exist_ok=True) - self.console = Console(width=180, file=FileIO(path), theme=Theme(DEFAULT_STYLE)) - class Logger(logging.Logger): def success( @@ -481,20 +88,59 @@ class Logger(logging.Logger): return rv +class LogFilter(logging.Filter): + _filter_list: List[Callable[["LogRecord"], bool]] = [] + + def __init__(self, name: str = ""): + super().__init__(name=name) + + def add_filter(self, f: Callable[["LogRecord"], bool]) -> Self: + if f not in self._filter_list: + self._filter_list.append(f) + return self + + def filter(self, record: "LogRecord") -> bool: + return all(map(lambda func: func(record), self._filter_list)) + + +def default_filter(record: "LogRecord") -> bool: + """默认的过滤器""" + return record.name.split(".")[0] in ["TGPaimon", "uvicorn"] + + with _lock: if not __initialized__: if "PYCHARM_HOSTED" in os.environ: print() # 针对 pycharm 的控制台 bug logging.captureWarnings(True) handler, debug_handler, error_handler = ( - Handler(locals_max_length=4), - FileHandler(level=10, path=config.logger.path.joinpath("debug/debug.log")), - FileHandler(level=40, path=config.logger.path.joinpath("error/error.log")), + # 控制台 log 配置 + Handler( + locals_max_length=config.logger.locals_max_length, + locals_max_string=config.logger.locals_max_string, + locals_max_depth=config.logger.locals_max_depth, + ), + # debug.log 配置 + FileHandler( + level=10, + path=config.logger.path.joinpath("debug/debug.log"), + locals_max_depth=1, + locals_max_length=config.logger.locals_max_length, + locals_max_string=config.logger.locals_max_string, + ), + # error.log 配置 + FileHandler( + level=40, + path=config.logger.path.joinpath("error/error.log"), + locals_max_length=config.logger.locals_max_length, + locals_max_string=config.logger.locals_max_string, + locals_max_depth=config.logger.locals_max_depth, + ), ) - log_filter = logging.Filter("TGPaimon") - handler.addFilter(log_filter) - debug_handler.addFilter(log_filter) + default_log_filter = LogFilter().add_filter(default_filter) + handler.addFilter(default_log_filter) + debug_handler.addFilter(default_log_filter) level_ = 10 if config.debug else 20 logging.basicConfig( diff --git a/utils/log/_traceback.py b/utils/log/_traceback.py new file mode 100644 index 0000000..5d319f1 --- /dev/null +++ b/utils/log/_traceback.py @@ -0,0 +1,366 @@ +import os +import traceback as traceback_ +from types import ModuleType, TracebackType +from typing import ( + Any, + Dict, + Iterable, + List, + Mapping, + Optional, + TYPE_CHECKING, + Tuple, + Type, + Union, +) + +from rich import pretty +from rich.columns import Columns +from rich.console import ( + RenderResult, + group, +) +from rich.highlighter import ReprHighlighter +from rich.panel import Panel +from rich.pretty import Pretty +from rich.syntax import ( + PygmentsSyntaxTheme, + Syntax, +) +from rich.table import Table +from rich.text import ( + Text, + TextType, +) +from rich.traceback import ( + Frame, + PathHighlighter, + Stack, + Trace, + Traceback as BaseTraceback, +) + +from core.config import config +from utils.log._style import ( + MonokaiProStyle, +) + +if TYPE_CHECKING: + from rich.console import ConsoleRenderable # pylint: disable=W0611 + +__all__ = ["render_scope", "Traceback"] + + +def render_scope( + scope: Mapping[str, Any], + *, + title: Optional[TextType] = None, + sort_keys: bool = False, + indent_guides: bool = False, + max_length: Optional[int] = None, + max_string: Optional[int] = None, + max_depth: Optional[int] = None, +) -> "ConsoleRenderable": + """在给定范围内渲染 python 变量 + + Args: + scope (Mapping): 包含变量名称和值的映射. + title (str, optional): 标题. 默认为 None. + sort_keys (bool, optional): 启用排序. 默认为 True. + indent_guides (bool, optional): 启用缩进线. 默认为 False. + max_length (int, optional): 缩写前容器的最大长度; 若为 None , 则表示没有缩写. 默认为 None. + max_string (int, optional): 截断前字符串的最大长度; 若为 None , 则表示不会截断. 默认为 None. + max_depth (int, optional): 嵌套数据结构的最大深度; 若为 None , 则表示会一直递归访问至最后一层. 默认为 None. + + Returns: + ConsoleRenderable: 可被 rich 渲染的对象. + """ + highlighter = ReprHighlighter() + items_table = Table.grid(padding=(0, 1), expand=False) + items_table.add_column(justify="right") + + def sort_items(item: Tuple[str, Any]) -> Tuple[bool, str]: + # noinspection PyShadowingNames + key, _ = item + return not key.startswith("__"), key.lower() + + # noinspection PyTypeChecker + items = sorted(scope.items(), key=sort_items) if sort_keys else scope.items() + for key, value in items: + key_text = Text.assemble( + (key, "scope.key.special" if key.startswith("__") else "scope.key"), + (" =", "scope.equals"), + ) + items_table.add_row( + key_text, + Pretty( + value, + highlighter=highlighter, + indent_guides=indent_guides, + max_length=max_length, + max_string=max_string, + max_depth=max_depth, + ), + ) + return Panel.fit( + items_table, + title=title, + border_style="scope.border", + padding=(0, 1), + ) + + +class Traceback(BaseTraceback): + locals_max_depth: Optional[int] + + def __init__(self, *args, locals_max_depth: Optional[int] = None, **kwargs): + kwargs.update({"show_locals": True, "max_frames": config.logger.traceback_max_frames}) + super(Traceback, self).__init__(*args, **kwargs) + self.locals_max_depth = locals_max_depth + + @classmethod + def from_exception( + cls, + exc_type: Type[BaseException], + exc_value: BaseException, + traceback: Optional[TracebackType], + width: Optional[int] = 100, + extra_lines: int = 3, + theme: Optional[str] = None, + word_wrap: bool = False, + show_locals: bool = False, + indent_guides: bool = True, + locals_max_length: int = config.logger.locals_max_length, + locals_max_string: int = config.logger.locals_max_string, + locals_max_depth: Optional[int] = config.logger_locals_max_depth, + suppress: Iterable[Union[str, ModuleType]] = (), + max_frames: int = 100, + ) -> "Traceback": + rich_traceback = cls.extract( + exc_type=exc_type, + exc_value=exc_value, + traceback=traceback, + show_locals=show_locals, + locals_max_depth=locals_max_depth, + locals_max_string=locals_max_string, + locals_max_length=locals_max_length, + ) + return cls( + rich_traceback, + width=width, + extra_lines=extra_lines, + theme=PygmentsSyntaxTheme(MonokaiProStyle), + word_wrap=word_wrap, + show_locals=show_locals, + indent_guides=indent_guides, + locals_max_length=locals_max_length, + locals_max_string=locals_max_string, + locals_max_depth=locals_max_depth, + suppress=suppress, + max_frames=max_frames, + ) + + @classmethod + def extract( + cls, + exc_type: Type[BaseException], + exc_value: BaseException, + traceback: Optional[TracebackType], + show_locals: bool = False, + locals_max_length: int = 10, + locals_max_string: int = 80, + locals_max_depth: Optional[int] = None, + ) -> Trace: + # noinspection PyProtectedMember + from rich import _IMPORT_CWD + + stacks: List[Stack] = [] + is_cause = False + + def safe_str(_object: Any) -> str: + # noinspection PyBroadException + try: + return str(_object) + except Exception: # pylint: disable=W0703 + return "" + + while True: + stack = Stack( + exc_type=safe_str(exc_type.__name__), + exc_value=safe_str(exc_value), + is_cause=is_cause, + ) + + if isinstance(exc_value, SyntaxError): + # noinspection PyProtectedMember + from rich.traceback import _SyntaxError + + stack.syntax_error = _SyntaxError( + offset=exc_value.offset or 0, + filename=exc_value.filename or "?", + lineno=exc_value.lineno or 0, + line=exc_value.text or "", + msg=exc_value.msg, + ) + + stacks.append(stack) + append = stack.frames.append + + for frame_summary, line_no in traceback_.walk_tb(traceback): + filename = frame_summary.f_code.co_filename + if filename and not filename.startswith("<") and not os.path.isabs(filename): + filename = os.path.join(_IMPORT_CWD, filename) + if frame_summary.f_locals.get("_rich_traceback_omit", False): + continue + frame = Frame( + filename=filename or "?", + lineno=line_no, + name=frame_summary.f_code.co_name, + locals={ + key: pretty.traverse( + value, + max_length=locals_max_length, + max_string=locals_max_string, + max_depth=locals_max_depth, + ) + for key, value in frame_summary.f_locals.items() + } + if show_locals + else None, + ) + append(frame) + if frame_summary.f_locals.get("_rich_traceback_guard", False): + del stack.frames[:] + + cause = getattr(exc_value, "__cause__", None) + if cause: + exc_type = cause.__class__ + exc_value = cause + # __traceback__ can be None, e.g. for exceptions raised by the + # 'multiprocessing' module + traceback = cause.__traceback__ + is_cause = True + continue + + cause = exc_value.__context__ + if cause and not getattr(exc_value, "__suppress_context__", False): + exc_type = cause.__class__ + exc_value = cause + traceback = cause.__traceback__ + is_cause = False + continue + # No cover, code is reached but coverage doesn't recognize it. + break # pragma: no cover + + trace = Trace(stacks=stacks) + return trace + + @group() + def _render_stack(self, stack: Stack) -> RenderResult: + path_highlighter = PathHighlighter() + theme = self.theme + code_cache: Dict[str, str] = {} + + # noinspection PyShadowingNames + def read_code(filename: str) -> str: + code = code_cache.get(filename) + if code is None: + with open(filename, "rt", encoding="utf-8", errors="replace") as code_file: + code = code_file.read() + code_cache[filename] = code + return code + + # noinspection PyShadowingNames + def render_locals(frame: Frame) -> Iterable["ConsoleRenderable"]: + if frame.locals: + yield render_scope( + scope=frame.locals, + title="locals", + indent_guides=self.indent_guides, + max_length=self.locals_max_length, + max_string=self.locals_max_string, + max_depth=self.locals_max_depth, + ) + + exclude_frames: Optional[range] = None + if self.max_frames != 0: + exclude_frames = range( + self.max_frames // 2, + len(stack.frames) - self.max_frames // 2, + ) + + excluded = False + for frame_index, frame in enumerate(stack.frames): + + if exclude_frames and frame_index in exclude_frames: + excluded = True + continue + + if excluded: + if exclude_frames is None: + raise ValueError(exclude_frames) + yield Text( + f"\n... {len(exclude_frames)} frames hidden ...", + justify="center", + style="traceback.error", + ) + excluded = False + + first = frame_index == 0 + frame_filename = frame.filename + suppressed = any(frame_filename.startswith(path) for path in self.suppress) + + text = Text.assemble( + path_highlighter(Text(frame.filename, style="pygments.string")), + (":", "pygments.text"), + (str(frame.lineno), "pygments.number"), + " in ", + (frame.name, "pygments.function"), + style="pygments.text", + ) + if not frame.filename.startswith("<") and not first: + yield "" + yield text + if frame.filename.startswith("<"): + yield from render_locals(frame) + continue + if not suppressed: + try: + if self.width is not None: + code_width = self.width - 5 + else: + code_width = 100 + code = read_code(frame.filename) + lexer_name = self._guess_lexer(frame.filename, code) + syntax = Syntax( + code, + lexer_name, + theme=theme, + line_numbers=True, + line_range=( + frame.lineno - self.extra_lines, + frame.lineno + self.extra_lines, + ), + highlight_lines={frame.lineno}, + word_wrap=self.word_wrap, + code_width=code_width, + indent_guides=self.indent_guides, + dedent=False, + ) + yield "" + except Exception as error: # pylint: disable=W0703 + yield Text.assemble( + (f"\n{error}", "traceback.error"), + ) + else: + yield ( + Columns( + [ + syntax, + *render_locals(frame), + ], + padding=1, + ) + if frame.locals + else syntax + )