Add pg_logger

This commit is contained in:
Karako 2023-04-02 16:48:48 +08:00
parent f83a6361b1
commit 393a6337a2
No known key found for this signature in database
GPG Key ID: 5920831B0095D4A0
8 changed files with 1382 additions and 0 deletions

View File

@ -0,0 +1,6 @@
from pg_logger._config import LoggerConfig
from pg_logger._handler import *
from pg_logger._logger import (
LogFilter,
Logger,
)

62
src/pg_logger/_config.py Normal file
View File

@ -0,0 +1,62 @@
from multiprocessing import RLock as Lock
from pathlib import Path
from typing import (
List,
Optional,
Union,
)
from pydantic import BaseSettings
from pg_logger._typedefs import COLOR_SYSTEM
__all__ = ("LoggerConfig",)
class LoggerConfig(BaseSettings):
_lock = Lock()
_instance: Optional["LoggerConfig"] = None
def __new__(cls, *args, **kwargs) -> "LoggerConfig":
with cls._lock:
if cls._instance is None:
cls.update_forward_refs()
result = super(LoggerConfig, cls).__new__(cls)
result.__init__(*args, **kwargs)
cls._instance = result
return cls._instance
name: str = "PaiGram-Logger"
"""logger 的名称"""
level: Optional[Union[str, int]] = None
"""logger 的 level"""
debug: bool = False
"""是否 debug"""
width: int = 180
"""输出时的宽度"""
keywords: List[str] = []
"""高亮的关键字"""
time_format: str = "[%Y-%m-%d %X]"
"""时间格式"""
capture_warnings: bool = True
"""是否捕获 warning"""
color_system: COLOR_SYSTEM = "auto"
"""颜色模式: 自动、标准、256色、真彩、Windows模式"""
log_path: Union[str, Path] = "./logs"
"""log 所保存的路径,项目根目录的相对路径"""
project_root: Union[str, Path] = Path(".")
"""项目根目录"""
traceback_max_frames: int = 20
"""Traceback 的最大回溯帧数"""
traceback_locals_max_depth: Optional[int] = None
"""Traceback 的 locals 变量的最大访问深度"""
traceback_locals_max_length: int = 10
"""打印 Traceback 的 locals 变量时的最大长度"""
traceback_locals_max_string: int = 80
"""打印 Traceback 的 locals 变量时的最大字符数"""
class Config(BaseSettings.Config):
env_prefix = "logger_"

148
src/pg_logger/_file.py Normal file
View File

@ -0,0 +1,148 @@
import os
from datetime import date
from pathlib import Path
from types import TracebackType
from typing import (
AnyStr,
IO,
Iterable,
Iterator,
List,
Optional,
Type,
)
__all__ = ("FileIO",)
# noinspection SpellCheckingInspection
class FileIO(IO[str]):
def __init__(self, path: Path):
if path.suffix != "":
path.parent.mkdir(exist_ok=True, parents=True)
else:
path.mkdir(exist_ok=True, parents=True)
path = path.joinpath("log.log")
self._path = path
self._stream: Optional[IO[str]] = None
def _get_file_name(self, time: date) -> Path:
"""获取对应 date 的 log 文件名"""
num = 0
def get_path(n) -> Path:
time_string = time.strftime("%Y-%m-%d")
num_string = str(n).rjust(2, "0")
return self.dir.joinpath(
f"{time_string}" f"{f'-{num_string}' if n else ''}.log"
)
path = base = get_path(num)
while path.exists():
path = get_path(num := num + 1)
if base.exists():
base.rename(path)
return get_path(num + 1)
else:
return get_path(num)
@property
def dir(self) -> Path:
"""日志文件所在的文件夹"""
return self._path.parent
@property
def _file_stream(self) -> IO[str]:
"""日志文件的IO流"""
# if self._stream:
# self._stream.close()
old_straem_path: Optional[Path] = None
if self._stream is not None and not self._stream.closed:
old_straem_path = Path(self._stream.name).resolve()
today = date.today()
if not self._path.exists():
# 若日志文件不存在
self._stream = self._path.open(mode="a+", encoding="utf-8")
return self._stream
modify_date = date.fromtimestamp(os.stat(self._path).st_mtime) # 日志文件的修改日期
if modify_date != today:
if self._stream is not None and not self._stream.closed:
self._stream.close()
_file = self._get_file_name(modify_date)
self._path.rename(_file)
if (
old_straem_path is not None
and old_straem_path == self._path.resolve()
and not self._stream.closed
):
return self._stream
self._stream = self._path.open(mode="a+", encoding="utf-8")
return self._stream
def close(self) -> None:
return self._file_stream.close()
def fileno(self) -> int:
return self._file_stream.fileno()
def flush(self) -> None:
return self._file_stream.flush()
def isatty(self) -> bool:
return self._file_stream.isatty()
def read(self, __n: int = -1) -> AnyStr:
return self._file_stream.read(__n)
def readable(self) -> bool:
return self._file_stream.readable()
def readline(self, __limit: int = ...) -> AnyStr:
return self._file_stream.readline()
def readlines(self, __hint: int = ...) -> List[AnyStr]:
return self._file_stream.readlines()
def seek(self, __offset: int, __whence: int = 0) -> int:
return self._file_stream.seek(__offset, __whence)
def seekable(self) -> bool:
return self._file_stream.seekable()
def tell(self) -> int:
return self._file_stream.tell()
def truncate(self, __size: Optional[int] = None) -> int:
return self._file_stream.truncate(__size)
def writable(self) -> bool:
return self._file_stream.writable()
def write(self, __s: AnyStr) -> int:
return self._file_stream.write(__s)
def writelines(self, __lines: Iterable[AnyStr]) -> None:
return self._file_stream.writelines(__lines)
def __next__(self) -> AnyStr:
return self._file_stream.__next__()
def __iter__(self) -> Iterator[AnyStr]:
return self._file_stream.__iter__()
def __enter__(self) -> IO[AnyStr]:
return self._file_stream.__enter__()
def __exit__(
self,
__t: Optional[Type[BaseException]],
__value: Optional[BaseException],
__traceback: Optional[TracebackType],
) -> None:
return self._file_stream.__exit__(__t, __value, __traceback)

312
src/pg_logger/_handler.py Normal file
View File

@ -0,0 +1,312 @@
from functools import cached_property
import logging
import os
from datetime import datetime
from pathlib import Path
from typing import (
Any,
Callable,
Iterable,
List,
Optional,
TYPE_CHECKING,
Union,
)
from rich.console import Console
from rich.containers import Renderables
from rich.logging import (
LogRender as DefaultLogRender,
RichHandler as DefaultRichHandler,
)
from rich.table import Table
from rich.text import (
Text,
TextType,
)
from rich.theme import Theme
from pg_logger._file import FileIO
from pg_logger._style import DEFAULT_STYLE
from pg_logger._traceback import Traceback
from pg_logger._typedefs import COLOR_SYSTEM
try:
import ujson as json
from ujson import JSONDecodeError
except ImportError:
import json
from json import JSONDecodeError
if TYPE_CHECKING:
from rich.console import (
ConsoleRenderable,
RenderableType,
)
from logging import LogRecord
__all__ = ["LogRender", "Handler", "FileHandler"]
FormatTimeCallable = Callable[[datetime], Text]
logging.addLevelName(5, "TRACE")
logging.addLevelName(25, "SUCCESS")
class LogRender(DefaultLogRender):
@property
def last_time(self):
return self._last_time
@last_time.setter
def last_time(self, last_time):
self._last_time = last_time
@cached_property
def levels_width(self) -> int:
# noinspection PyUnresolvedReferences,PyProtectedMember
return max(map(len, logging._nameToLevel.keys()))
def __call__(
self,
console: "Console",
renderables: Iterable["ConsoleRenderable"],
log_time: Optional[datetime] = None,
time_format: Optional[Union[str, FormatTimeCallable]] = None,
level: TextType = "",
path: Optional[str] = None,
line_no: Optional[int] = None,
link_path: Optional[str] = None,
) -> Table:
output = Table.grid(padding=(0, 1))
output.expand = True
output.add_column(style="log.time")
output.add_column(style="log.level", width=self.levels_width)
output.add_column(ratio=1, style="log.message", overflow="fold")
if path:
output.add_column(style="log.path")
if line_no:
output.add_column(style="log.line_no", width=4)
row: List["RenderableType"] = []
if self.show_time:
log_time = log_time or console.get_datetime()
time_format = time_format or self.time_format
if callable(time_format):
log_time_display = time_format(log_time)
else:
log_time_display = Text(log_time.strftime(time_format))
if log_time_display == self.last_time and self.omit_repeated_times:
row.append(Text(" " * len(log_time_display)))
else:
row.append(log_time_display)
self.last_time = log_time_display
if self.show_level:
row.append(level)
row.append(Renderables(renderables))
if path:
path_text = Text()
path_text.append(
path, style=f"link file://{link_path}" if link_path else ""
)
row.append(path_text)
line_no_text = Text()
line_no_text.append(
str(line_no),
style=f"link file://{link_path}#{line_no}" if link_path else "",
)
row.append(line_no_text)
output.add_row(*row)
return output
class Handler(DefaultRichHandler):
def __init__(
self,
*args,
width: int = None,
rich_tracebacks: bool = True,
locals_max_depth: Optional[int] = None,
tracebacks_max_frames: int = 100,
keywords: Optional[List[str]] = None,
log_time_format: Union[str, FormatTimeCallable] = "[%x %X]",
project_root: Union[str, Path] = os.getcwd(),
color_system: COLOR_SYSTEM = "auto",
**kwargs,
) -> None:
super(Handler, self).__init__(*args, rich_tracebacks=rich_tracebacks, **kwargs)
self._log_render = LogRender(time_format=log_time_format, show_level=True)
self.console = Console(
color_system=color_system, theme=Theme(DEFAULT_STYLE), width=width
)
self.tracebacks_show_locals = True
self.tracebacks_max_frames = tracebacks_max_frames
self.render_keywords = self.KEYWORDS + (keywords or [])
self.locals_max_depth = locals_max_depth
self.project_root = Path(project_root).resolve()
def render(
self,
*,
record: "LogRecord",
traceback: Optional[Traceback],
message_renderable: Optional["ConsoleRenderable"],
) -> "ConsoleRenderable":
if record.pathname != "<input>":
try:
path = str(Path(record.pathname).relative_to(self.project_root).with_suffix(''))
path = path.replace(os.sep, ".")
except ValueError:
import site
path = None
for s in site.getsitepackages():
try:
path = str(Path(record.pathname).relative_to(Path(s)))
break
except ValueError:
continue
if path is None:
path = "<PKGS>"
else:
path = path.split(".")[0].replace(os.sep, ".")
else:
path = "<INPUT>"
path = path.replace("lib.site-packages.", "")
_level = self.get_level_text(record)
time_format = None if self.formatter is None else self.formatter.datefmt
log_time = datetime.fromtimestamp(record.created)
if not traceback:
renderables = [message_renderable]
else:
renderables = (
[message_renderable, traceback]
if message_renderable is not None
else [traceback]
)
log_renderable = self._log_render(
self.console,
renderables,
log_time=log_time,
time_format=time_format,
level=_level,
path=path,
link_path=record.pathname if self.enable_link_path else None,
line_no=record.lineno,
)
return log_renderable
def render_message(
self,
record: "LogRecord",
message: Any,
) -> "ConsoleRenderable":
use_markup = getattr(record, "markup", self.markup)
if isinstance(message, str):
message_text = Text.from_markup(message) if use_markup else Text(message)
highlighter = getattr(record, "highlighter", self.highlighter)
else:
from rich.highlighter import JSONHighlighter
from rich.json import JSON
highlighter = JSONHighlighter()
message_text = JSON.from_data(message, indent=4).text
if highlighter is not None:
# noinspection PyCallingNonCallable
message_text = highlighter(message_text)
if self.render_keywords is None:
self.render_keywords = self.KEYWORDS
if self.render_keywords:
message_text.highlight_words(self.render_keywords, "logging.keyword")
return message_text
def emit(self, record: "LogRecord") -> None:
message = self.format(record)
_traceback = None
if (
self.rich_tracebacks
and record.exc_info
and record.exc_info != (None, None, None)
):
exc_type, exc_value, exc_traceback = record.exc_info
if exc_type is None or exc_value is None:
raise ValueError(record)
try:
_traceback = Traceback.from_exception(
exc_type,
exc_value,
exc_traceback,
width=self.tracebacks_width,
extra_lines=self.tracebacks_extra_lines,
word_wrap=self.tracebacks_word_wrap,
show_locals=(
getattr(record, "show_locals", None)
or self.tracebacks_show_locals
),
locals_max_length=(
getattr(record, "locals_max_length", None)
or self.locals_max_length
),
locals_max_string=(
getattr(record, "locals_max_string", None)
or self.locals_max_string
),
locals_max_depth=(
getattr(record, "locals_max_depth")
if hasattr(record, "locals_max_depth")
else self.locals_max_depth
),
suppress=self.tracebacks_suppress,
max_frames=self.tracebacks_max_frames,
)
except ImportError:
return
message = record.getMessage()
if self.formatter:
record.message = record.getMessage()
formatter = self.formatter
if hasattr(formatter, "usesTime") and formatter.usesTime():
record.asctime = formatter.formatTime(record, formatter.datefmt)
message = formatter.formatMessage(record)
if message == str(exc_value):
message = None
if message is not None:
try:
message_renderable = self.render_message(record, json.loads(message))
except JSONDecodeError:
message_renderable = self.render_message(record, message)
else:
message_renderable = None
log_renderable = self.render(
record=record, traceback=_traceback, message_renderable=message_renderable
)
# noinspection PyBroadException
try:
self.console.print(log_renderable)
except Exception: # pylint: disable=W0703
self.handleError(record)
class FileHandler(Handler):
def __init__(
self,
*args,
width: int = None,
path: Path,
**kwargs,
) -> None:
super().__init__(*args, **kwargs)
path.parent.mkdir(exist_ok=True, parents=True)
self.console = Console(
width=width, file=FileIO(path), theme=Theme(DEFAULT_STYLE)
)

208
src/pg_logger/_logger.py Normal file
View File

@ -0,0 +1,208 @@
import inspect
import io
import logging
import os
import traceback as traceback_
from multiprocessing import RLock as Lock
from pathlib import Path
from types import TracebackType
from typing import (
Any,
Callable,
List,
Mapping,
Optional,
TYPE_CHECKING,
Tuple,
Type,
Union,
)
from typing_extensions import Self
from pg_logger._handler import (
FileHandler,
Handler,
)
if TYPE_CHECKING:
from pg_logger import LoggerConfig
from logging import LogRecord
__all__ = ["Logger", "LogFilter"]
SysExcInfoType = Union[
Tuple[Type[BaseException], BaseException, Optional[TracebackType]],
Tuple[None, None, None],
]
ExceptionInfoType = Union[bool, SysExcInfoType, BaseException]
_lock = Lock()
NONE = object()
class Logger(logging.Logger):
_instance: Optional["Logger"] = None
def __new__(cls, *args, **kwargs) -> "Logger":
with _lock:
if cls._instance is None:
result = super(Logger, cls).__new__(cls)
cls._instance = result
return cls._instance
def __init__(self, config: "LoggerConfig" = None) -> None:
from pg_logger import LoggerConfig
self.config = config or LoggerConfig()
level_ = 10 if self.config.debug else 20
super().__init__(name=self.config.name, level=self.config.level or level_)
log_path = Path(self.config.project_root).joinpath(self.config.log_path)
handler_config = {
"width": self.config.width,
"locals_max_length": self.config.traceback_locals_max_length,
"locals_max_string": self.config.traceback_locals_max_string,
"project_root": self.config.project_root,
"log_time_format": self.config.time_format,
"color_system": self.config.color_system,
}
handler, debug_handler, error_handler = (
# 控制台 log 配置
Handler(
level=level_,
locals_max_depth=self.config.traceback_locals_max_depth,
**handler_config,
),
# debug.log 配置
FileHandler(
level=10,
path=log_path.joinpath("debug/debug.log"),
locals_max_depth=1,
**handler_config,
),
# error.log 配置
FileHandler(
level=40,
path=log_path.joinpath("error/error.log"),
locals_max_depth=self.config.traceback_locals_max_depth,
**handler_config,
),
)
logging.basicConfig(
level=10 if self.config.debug else 20,
format="%(message)s",
datefmt=self.config.time_format,
handlers=[handler, debug_handler, error_handler],
)
if config.capture_warnings:
logging.captureWarnings(True)
warnings_logger = logging.getLogger("py.warnings")
warnings_logger.addHandler(handler)
warnings_logger.addHandler(debug_handler)
self.addHandler(handler)
self.addHandler(debug_handler)
self.addHandler(error_handler)
def success(
self,
msg: Any,
*args: Any,
exc_info: Optional[ExceptionInfoType] = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Optional[Mapping[str, Any]] = None,
) -> None:
return self.log(
25,
msg,
*args,
exc_info=exc_info,
stack_info=stack_info,
stacklevel=stacklevel,
extra=extra,
)
def exception(
self,
msg: Any = NONE,
*args: Any,
exc_info: Optional[ExceptionInfoType] = True,
stack_info: bool = False,
stacklevel: int = 1,
extra: Optional[Mapping[str, Any]] = None,
**kwargs,
) -> None: # pylint: disable=W1113
super(Logger, self).exception(
"" if msg is NONE else msg,
*args,
exc_info=exc_info,
stack_info=stack_info,
stacklevel=stacklevel,
extra=extra,
)
def findCaller(
self, stack_info: bool = False, stacklevel: int = 1
) -> Tuple[str, int, str, Optional[str]]:
frame = inspect.currentframe()
if frame is not None:
frame = frame.f_back
original_frame = frame
while frame and stacklevel > 1:
frame = frame.f_back
stacklevel -= 1
if not frame:
frame = original_frame
rv = "(unknown file)", 0, "(unknown function)", None
while hasattr(frame, "f_code"):
code = frame.f_code
filename = os.path.normcase(code.co_filename)
if filename in [
os.path.normcase(Path(__file__).resolve()),
os.path.normcase(logging.addLevelName.__code__.co_filename),
]:
frame = frame.f_back
continue
sinfo = None
if stack_info:
sio = io.StringIO()
sio.write("Stack (most recent call last):\n")
traceback_.print_stack(frame, file=sio)
sinfo = sio.getvalue()
if sinfo[-1] == "\n":
sinfo = sinfo[:-1]
sio.close()
rv = (code.co_filename, frame.f_lineno, code.co_name, sinfo)
break
return rv
# noinspection PyShadowingBuiltins
def addFilter(
self, filter: Union[logging.Filter, Callable[["LogRecord"], int]]
) -> None:
for handler in self.handlers:
handler.addFilter(filter)
class LogFilter(logging.Filter):
_filter_list: List[Callable[["LogRecord"], bool]] = []
def __init__(self, name: str = ""):
super().__init__(name=name)
def add_filter(self, f: Callable[["LogRecord"], bool]) -> Self:
if f not in self._filter_list:
self._filter_list.append(f)
return self
def filter(self, record: "LogRecord") -> bool:
"""确定是否要记录指定的记录。
如果应记录记录则返回 True;否则返回 False
如果认为合适可以就地修改记录
"""
return all(map(lambda func: func(record), self._filter_list))

269
src/pg_logger/_style.py Normal file
View File

@ -0,0 +1,269 @@
from typing import Dict
from pygments.style import Style as PyStyle
from pygments.token import (
Comment,
Error,
Generic,
Keyword,
Literal,
Name,
Number,
Operator,
Punctuation,
String,
Text,
)
from rich.style import Style
__all__ = [
"MonokaiProStyle",
"DEFAULT_STYLE",
"BACKGROUND",
"FOREGROUND",
"BLACK",
"DARK_GREY",
"LIGHT_GREY",
"GREY",
"RED",
"MAGENTA",
"GREEN",
"YELLOW",
"ORANGE",
"PURPLE",
"BLUE",
"CYAN",
"WHITE",
]
BACKGROUND = "#272822"
FOREGROUND = "#f8f8f2"
BLACK = "#1A1A1A"
DARK_GREY = "#363537"
LIGHT_GREY = "#69676c"
GREY = "#595959"
RED = "#ff6188"
MAGENTA = "#FC61D3"
GREEN = "#7bd88f"
YELLOW = "#ffd866"
ORANGE = "#fc9867"
PURPLE = "#ab9df2"
BLUE = "#81a1c1"
CYAN = "#78dce8"
WHITE = "#e5e9f0"
class MonokaiProStyle(PyStyle):
background_color = DARK_GREY
highlight_color = "#49483e"
styles = {
# No corresponding class for the following:
Text: WHITE, # class: ''
Error: "#fc618d bg:#1e0010", # class: 'err'
Comment: LIGHT_GREY, # class: 'c'
Comment.Multiline: YELLOW, # class: 'cm'
Keyword: RED, # class: 'k'
Keyword.Namespace: GREEN, # class: 'kn'
Operator: RED, # class: 'o'
Punctuation: WHITE, # class: 'p'
Name: WHITE, # class: 'n'
Name.Attribute: GREEN, # class: 'na' - to be revised
Name.Builtin: CYAN, # class: 'nb'
Name.Builtin.Pseudo: ORANGE, # class: 'bp'
Name.Class: GREEN, # class: 'nc' - to be revised
Name.Decorator: PURPLE, # class: 'nd' - to be revised
Name.Exception: GREEN, # class: 'ne'
Name.Function: GREEN, # class: 'nf'
Name.Property: ORANGE, # class: 'py'
Number: PURPLE, # class: 'm'
Literal: PURPLE, # class: 'l'
Literal.Date: ORANGE, # class: 'ld'
String: YELLOW, # class: 's'
String.Regex: ORANGE, # class: 'sr'
Generic.Deleted: YELLOW, # class: 'gd',
Generic.Emph: "italic", # class: 'ge'
Generic.Inserted: GREEN, # class: 'gi'
Generic.Strong: "bold", # class: 'gs'
Generic.Subheading: LIGHT_GREY, # class: 'gu'
}
DEFAULT_STYLE: Dict[str, Style] = {
# base
"none": Style.null(),
"reset": Style(
color=FOREGROUND,
bgcolor=BACKGROUND,
dim=False,
bold=False,
italic=False,
underline=False,
blink=False,
blink2=False,
reverse=False,
conceal=False,
strike=False,
),
"dim": Style(dim=True),
"bright": Style(dim=False),
"bold": Style(bold=True),
"strong": Style(bold=True),
"code": Style(reverse=True, bold=True),
"italic": Style(italic=True),
"emphasize": Style(italic=True),
"underline": Style(underline=True),
"blink": Style(blink=True),
"blink2": Style(blink2=True),
"reverse": Style(reverse=True),
"strike": Style(strike=True),
"black": Style(color=BLACK),
"red": Style(color=RED),
"green": Style(color=GREEN),
"yellow": Style(color=YELLOW),
"magenta": Style(color=MAGENTA),
"blue": Style(color=BLUE),
"cyan": Style(color=CYAN),
"white": Style(color=WHITE),
# inspect
"inspect.attr": Style(color=YELLOW, italic=True),
"inspect.attr.dunder": Style(color=YELLOW, italic=True, dim=True),
"inspect.callable": Style(bold=True, color=RED),
"inspect.def": Style(italic=True, color="bright_cyan"),
"inspect.class": Style(italic=True, color="bright_cyan"),
"inspect.error": Style(bold=True, color=RED),
"inspect.equals": Style(),
"inspect.help": Style(color=CYAN),
"inspect.doc": Style(dim=True),
"inspect.value.border": Style(color=GREEN),
# live
"live.ellipsis": Style(bold=True, color=RED),
# layout
"layout.tree.row": Style(dim=False, color=RED),
"layout.tree.column": Style(dim=False, color=BLUE),
# log
"logging.keyword": Style(bold=True, color=ORANGE),
"logging.level.notset": Style(color=DARK_GREY, dim=True),
"logging.level.trace": Style(color=GREY),
"logging.level.debug": Style(color=LIGHT_GREY, bold=True),
"logging.level.info": Style(color="white"),
"logging.level.plugin": Style(color="cyan"),
"logging.level.success": Style(color="green"),
"logging.level.warning": Style(color="yellow"),
"logging.level.error": Style(color="red"),
"logging.level.critical": Style(color="red", bgcolor="#1e0010", bold=True),
"log.level": Style.null(),
"log.time": Style(color=CYAN, dim=True),
"log.message": Style.null(),
"log.path": Style(dim=True),
"log.line_no": Style(color=CYAN, bold=True, italic=False, dim=True),
# repr
"repr.ellipsis": Style(color=YELLOW),
"repr.indent": Style(color=GREEN, dim=True),
"repr.error": Style(color=RED, bold=True),
"repr.str": Style(color=GREEN, italic=False, bold=False),
"repr.brace": Style(bold=True),
"repr.comma": Style(bold=True),
"repr.ipv4": Style(bold=True, color="bright_green"),
"repr.ipv6": Style(bold=True, color="bright_green"),
"repr.eui48": Style(bold=True, color="bright_green"),
"repr.eui64": Style(bold=True, color="bright_green"),
"repr.tag_start": Style(bold=True),
"repr.tag_name": Style(color="bright_magenta", bold=True),
"repr.tag_contents": Style(color="default"),
"repr.tag_end": Style(bold=True),
"repr.attrib_name": Style(color=YELLOW, italic=False),
"repr.attrib_equal": Style(bold=True),
"repr.attrib_value": Style(color=MAGENTA, italic=False),
"repr.number": Style(color=CYAN, bold=True, italic=False),
"repr.number_complex": Style(color=CYAN, bold=True, italic=False), # same
"repr.bool_true": Style(color="bright_green", italic=True),
"repr.bool_false": Style(color="bright_red", italic=True),
"repr.none": Style(color=MAGENTA, italic=True),
"repr.url": Style(underline=True, color="bright_blue", italic=False, bold=False),
"repr.uuid": Style(color="bright_yellow", bold=False),
"repr.call": Style(color=MAGENTA, bold=True),
"repr.path": Style(color=MAGENTA),
"repr.filename": Style(color="bright_magenta"),
"rule.line": Style(color="bright_green"),
"rule.text": Style.null(),
# json
"json.brace": Style(bold=True),
"json.bool_true": Style(color="bright_green", italic=True),
"json.bool_false": Style(color="bright_red", italic=True),
"json.null": Style(color=MAGENTA, italic=True),
"json.number": Style(color=CYAN, bold=True, italic=False),
"json.str": Style(color=GREEN, italic=False, bold=False),
"json.key": Style(color=BLUE, bold=True),
# prompt
"prompt": Style.null(),
"prompt.choices": Style(color=MAGENTA, bold=True),
"prompt.default": Style(color=CYAN, bold=True),
"prompt.invalid": Style(color=RED),
"prompt.invalid.choice": Style(color=RED),
# pretty
"pretty": Style.null(),
# scope
"scope.border": Style(color=BLUE),
"scope.key": Style(color=YELLOW, italic=True),
"scope.key.special": Style(color=YELLOW, italic=True, dim=True),
"scope.equals": Style(color=RED),
# table
"table.header": Style(bold=True),
"table.footer": Style(bold=True),
"table.cell": Style.null(),
"table.title": Style(italic=True),
"table.caption": Style(italic=True, dim=True),
# traceback
"traceback.error": Style(color=RED, italic=True),
"traceback.border.syntax_error": Style(color="bright_red"),
"traceback.border": Style(color=RED),
"traceback.text": Style.null(),
"traceback.title": Style(color=RED, bold=True),
"traceback.exc_type": Style(color="bright_red", bold=True),
"traceback.exc_value": Style.null(),
"traceback.offset": Style(color="bright_red", bold=True),
# bar
"bar.back": Style(color="grey23"),
"bar.complete": Style(color="rgb(249,38,114)"),
"bar.finished": Style(color="rgb(114,156,31)"),
"bar.pulse": Style(color="rgb(249,38,114)"),
# progress
"progress.description": Style.null(),
"progress.filesize": Style(color=GREEN),
"progress.filesize.total": Style(color=GREEN),
"progress.download": Style(color=GREEN),
"progress.elapsed": Style(color=YELLOW),
"progress.percentage": Style(color=MAGENTA),
"progress.remaining": Style(color=CYAN),
"progress.data.speed": Style(color=RED),
"progress.spinner": Style(color=GREEN),
"status.spinner": Style(color=GREEN),
# tree
"tree": Style(),
"tree.line": Style(),
# markdown
"markdown.paragraph": Style(),
"markdown.text": Style(),
"markdown.emph": Style(italic=True),
"markdown.strong": Style(bold=True),
"markdown.code": Style(bgcolor=BLACK, color="bright_white"),
"markdown.code_block": Style(dim=True, color=CYAN, bgcolor=BLACK),
"markdown.block_quote": Style(color=MAGENTA),
"markdown.list": Style(color=CYAN),
"markdown.item": Style(),
"markdown.item.bullet": Style(color=YELLOW, bold=True),
"markdown.item.number": Style(color=YELLOW, bold=True),
"markdown.hr": Style(color=YELLOW),
"markdown.h1.border": Style(),
"markdown.h1": Style(bold=True),
"markdown.h2": Style(bold=True, underline=True),
"markdown.h3": Style(bold=True),
"markdown.h4": Style(bold=True, dim=True),
"markdown.h5": Style(underline=True),
"markdown.h6": Style(italic=True),
"markdown.h7": Style(italic=True, dim=True),
"markdown.link": Style(color="bright_blue"),
"markdown.link_url": Style(color=BLUE),
}

372
src/pg_logger/_traceback.py Normal file
View File

@ -0,0 +1,372 @@
import os
import traceback as traceback_
from types import (
ModuleType,
TracebackType,
)
from typing import (
Any,
Dict,
Iterable,
List,
Mapping,
Optional,
TYPE_CHECKING,
Tuple,
Type,
Union,
)
from rich import pretty
from rich.columns import Columns
from rich.console import (
RenderResult,
group,
)
from rich.highlighter import ReprHighlighter
from rich.panel import Panel
from rich.pretty import Pretty
from rich.syntax import (
PygmentsSyntaxTheme,
Syntax,
)
from rich.table import Table
from rich.text import (
Text,
TextType,
)
from rich.traceback import (
Frame,
PathHighlighter,
Stack,
Trace,
Traceback as BaseTraceback,
)
from pg_logger._style import MonokaiProStyle
if TYPE_CHECKING:
from rich.console import ConsoleRenderable # pylint: disable=W0611
__all__ = ["render_scope", "Traceback"]
def render_scope(
scope: Mapping[str, Any],
*,
title: Optional[TextType] = None,
sort_keys: bool = False,
indent_guides: bool = False,
max_length: Optional[int] = None,
max_string: Optional[int] = None,
max_depth: Optional[int] = None,
) -> "ConsoleRenderable":
"""在给定范围内渲染 python 变量
Args:
scope (Mapping): 包含变量名称和值的映射.
title (str, optional): 标题. 默认为 None.
sort_keys (bool, optional): 启用排序. 默认为 True.
indent_guides (bool, optional): 启用缩进线. 默认为 False.
max_length (int, optional): 缩写前容器的最大长度; 若为 None , 则表示没有缩写. 默认为 None.
max_string (int, optional): 截断前字符串的最大长度; 若为 None , 则表示不会截断. 默认为 None.
max_depth (int, optional): 嵌套数据结构的最大深度; 若为 None , 则表示会一直递归访问至最后一层. 默认为 None.
Returns:
ConsoleRenderable: 可被 rich 渲染的对象.
"""
highlighter = ReprHighlighter()
items_table = Table.grid(padding=(0, 1), expand=False)
items_table.add_column(justify="right")
def sort_items(item: Tuple[str, Any]) -> Tuple[bool, str]:
# noinspection PyShadowingNames
key, _ = item
return not key.startswith("__"), key.lower()
# noinspection PyTypeChecker
items = sorted(scope.items(), key=sort_items) if sort_keys else scope.items()
for key, value in items:
key_text = Text.assemble(
(key, "scope.key.special" if key.startswith("__") else "scope.key"),
(" =", "scope.equals"),
)
items_table.add_row(
key_text,
Pretty(
value,
highlighter=highlighter,
indent_guides=indent_guides,
max_length=max_length,
max_string=max_string,
max_depth=max_depth,
),
)
return Panel.fit(
items_table,
title=title,
border_style="scope.border",
padding=(0, 1),
)
class Traceback(BaseTraceback):
locals_max_depth: Optional[int]
def __init__(self, *args, locals_max_depth: Optional[int] = None, **kwargs):
kwargs.update({"show_locals": True})
super(Traceback, self).__init__(*args, **kwargs)
self.locals_max_depth = locals_max_depth
@classmethod
def from_exception(
cls,
exc_type: Type[BaseException],
exc_value: BaseException,
traceback: Optional[TracebackType],
width: Optional[int] = 100,
extra_lines: int = 3,
theme: Optional[str] = None,
word_wrap: bool = False,
show_locals: bool = True,
indent_guides: bool = True,
locals_max_length: int = 10,
locals_max_string: int = 80,
locals_max_depth: Optional[int] = None,
suppress: Iterable[Union[str, ModuleType]] = (),
max_frames: int = 100,
**kwargs,
) -> "Traceback":
rich_traceback = cls.extract(
exc_type=exc_type,
exc_value=exc_value,
traceback=traceback,
show_locals=show_locals,
locals_max_depth=locals_max_depth,
locals_max_string=locals_max_string,
locals_max_length=locals_max_length,
)
return cls(
rich_traceback,
width=width,
extra_lines=extra_lines,
theme=PygmentsSyntaxTheme(MonokaiProStyle),
word_wrap=word_wrap,
show_locals=show_locals,
indent_guides=indent_guides,
locals_max_length=locals_max_length,
locals_max_string=locals_max_string,
locals_max_depth=locals_max_depth,
suppress=suppress,
max_frames=max_frames,
)
@classmethod
def extract(
cls,
exc_type: Type[BaseException],
exc_value: BaseException,
traceback: Optional[TracebackType],
show_locals: bool = False,
locals_max_length: int = 10,
locals_max_string: int = 80,
locals_max_depth: Optional[int] = None,
**kwargs,
) -> Trace:
# noinspection PyProtectedMember
from rich import _IMPORT_CWD
stacks: List[Stack] = []
is_cause = False
def safe_str(_object: Any) -> str:
# noinspection PyBroadException
try:
return str(_object)
except Exception: # pylint: disable=W0703
return "<exception str() failed>"
while True:
stack = Stack(
exc_type=safe_str(exc_type.__name__),
exc_value=safe_str(exc_value),
is_cause=is_cause,
)
if isinstance(exc_value, SyntaxError):
# noinspection PyProtectedMember
from rich.traceback import _SyntaxError
stack.syntax_error = _SyntaxError(
offset=exc_value.offset or 0,
filename=exc_value.filename or "?",
lineno=exc_value.lineno or 0,
line=exc_value.text or "",
msg=exc_value.msg,
)
stacks.append(stack)
append = stack.frames.append
for frame_summary, line_no in traceback_.walk_tb(traceback):
filename = frame_summary.f_code.co_filename
if (
filename
and not filename.startswith("<")
and not os.path.isabs(filename)
):
filename = os.path.join(_IMPORT_CWD, filename)
if frame_summary.f_locals.get("_rich_traceback_omit", False):
continue
frame = Frame(
filename=filename or "?",
lineno=line_no,
name=frame_summary.f_code.co_name,
locals={
key: pretty.traverse(
value,
max_length=locals_max_length,
max_string=locals_max_string,
max_depth=locals_max_depth,
)
for key, value in frame_summary.f_locals.items()
}
if show_locals
else None,
)
append(frame)
if frame_summary.f_locals.get("_rich_traceback_guard", False):
del stack.frames[:]
cause = getattr(exc_value, "__cause__", None)
if cause:
exc_type = cause.__class__
exc_value = cause
# __traceback__ can be None, e.g. for exceptions raised by the
# 'multiprocessing' module
traceback = cause.__traceback__
is_cause = True
continue
cause = exc_value.__context__
if cause and not getattr(exc_value, "__suppress_context__", False):
exc_type = cause.__class__
exc_value = cause
traceback = cause.__traceback__
is_cause = False
continue
break
trace = Trace(stacks=stacks)
return trace
@group()
def _render_stack(self, stack: Stack) -> RenderResult:
path_highlighter = PathHighlighter()
theme = self.theme
code_cache: Dict[str, str] = {}
# noinspection PyShadowingNames
def read_code(filename: str) -> str:
code = code_cache.get(filename)
if code is None:
with open(
filename, "rt", encoding="utf-8", errors="replace"
) as code_file:
code = code_file.read()
code_cache[filename] = code
return code
# noinspection PyShadowingNames
def render_locals(frame: Frame) -> Iterable["ConsoleRenderable"]:
if frame.locals:
yield render_scope(
scope=frame.locals,
title="locals",
indent_guides=self.indent_guides,
max_length=self.locals_max_length,
max_string=self.locals_max_string,
max_depth=self.locals_max_depth,
)
exclude_frames: Optional[range] = None
if self.max_frames != 0:
exclude_frames = range(
self.max_frames // 2,
len(stack.frames) - self.max_frames // 2,
)
excluded = False
for frame_index, frame in enumerate(stack.frames):
if exclude_frames and frame_index in exclude_frames:
excluded = True
continue
if excluded:
if exclude_frames is None:
raise ValueError(exclude_frames)
yield Text(
f"\n... {len(exclude_frames)} frames hidden ...",
justify="center",
style="traceback.error",
)
excluded = False
first = frame_index == 0
frame_filename = frame.filename
suppressed = any(frame_filename.startswith(path) for path in self.suppress)
text = Text.assemble(
path_highlighter(Text(frame.filename, style="pygments.string")),
(":", "pygments.text"),
(str(frame.lineno), "pygments.number"),
" in ",
(frame.name, "pygments.function"),
style="pygments.text",
)
if not frame.filename.startswith("<") and not first:
yield ""
yield text
if frame.filename.startswith("<"):
yield from render_locals(frame)
continue
if not suppressed:
try:
if self.width is not None:
code_width = self.width - 5
else:
code_width = 100
code = read_code(frame.filename)
lexer_name = self._guess_lexer(frame.filename, code)
syntax = Syntax(
code,
lexer_name,
theme=theme,
line_numbers=True,
line_range=(
frame.lineno - self.extra_lines,
frame.lineno + self.extra_lines,
),
highlight_lines={frame.lineno},
word_wrap=self.word_wrap,
code_width=code_width,
indent_guides=self.indent_guides,
dedent=False,
)
yield ""
except Exception as error: # pylint: disable=W0703
yield Text.assemble(
(f"\n{error}", "traceback.error"),
)
else:
yield (
Columns(
[
syntax,
*render_locals(frame),
],
padding=1,
)
if frame.locals
else syntax
)

View File

@ -0,0 +1,5 @@
from typing import Literal
__all__ = ("COLOR_SYSTEM",)
COLOR_SYSTEM = Literal["auto", "standard", "256", "truecolor", "windows"]