PaiGram/utils/log/_logger.py

483 lines
16 KiB
Python
Raw Normal View History

import logging
import os
import sys
from datetime import datetime
from multiprocessing import RLock as Lock
from pathlib import Path
from typing import (Any, Callable, Dict, Iterable, List, Literal, Mapping, Optional, TYPE_CHECKING, Union)
import ujson as json
from rich.columns import Columns
from rich.console import (
Console,
RenderResult,
group,
)
from rich.logging import (
LogRender as DefaultLogRender,
RichHandler as DefaultRichHandler,
)
from rich.syntax import (
PygmentsSyntaxTheme,
Syntax,
)
from rich.text import (
Text,
TextType,
)
from rich.theme import Theme
from rich.traceback import (
Stack,
Traceback as BaseTraceback,
)
from ujson import JSONDecodeError
from core.config import BotConfig
from utils.const import NOT_SET, PROJECT_ROOT
from utils.log._file import FileIO
from utils.log._style import (
DEFAULT_STYLE,
MonokaiProStyle,
)
from utils.typedefs import ExceptionInfoType
if TYPE_CHECKING:
from rich.table import Table # pylint: disable=unused-import
from rich.console import ( # pylint: disable=unused-import
ConsoleRenderable,
RenderableType,
)
from rich.console import ( # pylint: disable=unused-import
ConsoleRenderable,
RenderableType,
)
from rich.table import Table # pylint: disable=unused-import
from logging import LogRecord # pylint: disable=unused-import
__all__ = ["logger"]
_lock = Lock()
__initialized__ = False
FormatTimeCallable = Callable[[datetime], Text]
config = BotConfig()
logging.addLevelName(5, 'TRACE')
logging.addLevelName(25, 'SUCCESS')
color_system: Literal['windows', 'truecolor']
if sys.platform == 'win32':
color_system = 'windows'
else:
color_system = 'truecolor'
# noinspection SpellCheckingInspection
log_console = Console(
color_system=color_system, theme=Theme(DEFAULT_STYLE), width=config.logger.width
)
class Traceback(BaseTraceback):
def __init__(self, *args, **kwargs):
kwargs.update({'show_locals': True, 'max_frames': config.logger.traceback_max_frames})
super(Traceback, self).__init__(*args, **kwargs)
self.theme = PygmentsSyntaxTheme(MonokaiProStyle)
@group()
def _render_stack(self, stack: Stack) -> RenderResult:
from rich.traceback import (
PathHighlighter,
Frame,
)
path_highlighter = PathHighlighter()
theme = self.theme
code_cache: Dict[str, str] = {}
# noinspection PyShadowingNames
def read_code(filename: str) -> str:
code = code_cache.get(filename)
if code is None:
with open(
filename, "rt", encoding="utf-8", errors="replace"
) as code_file:
code = code_file.read()
code_cache[filename] = code
return code
# noinspection PyShadowingNames
def render_locals(frame: Frame) -> Iterable["ConsoleRenderable"]:
if frame.locals:
from rich.scope import render_scope
yield render_scope(
frame.locals,
title="locals",
indent_guides=self.indent_guides,
max_length=self.locals_max_length,
max_string=self.locals_max_string,
)
exclude_frames: Optional[range] = None
if self.max_frames != 0:
exclude_frames = range(
self.max_frames // 2,
len(stack.frames) - self.max_frames // 2,
)
excluded = False
for frame_index, frame in enumerate(stack.frames):
if exclude_frames and frame_index in exclude_frames:
excluded = True
continue
if excluded:
if exclude_frames is None:
raise ValueError(exclude_frames)
yield Text(
f"\n... {len(exclude_frames)} frames hidden ...",
justify="center",
style="traceback.error",
)
excluded = False
first = frame_index == 0
frame_filename = frame.filename
suppressed = any(
frame_filename.startswith(path) for path in self.suppress)
text = Text.assemble(
path_highlighter(Text(frame.filename, style="pygments.string")),
(":", "pygments.text"),
(str(frame.lineno), "pygments.number"),
" in ",
(frame.name, "pygments.function"),
style="pygments.text",
)
if not frame.filename.startswith("<") and not first:
yield ""
yield text
if frame.filename.startswith("<"):
yield from render_locals(frame)
continue
if not suppressed:
try:
if self.width is not None:
code_width = self.width - 5
else:
code_width = 100
code = read_code(frame.filename)
lexer_name = self._guess_lexer(frame.filename, code)
syntax = Syntax(
code,
lexer_name,
theme=theme,
line_numbers=True,
line_range=(
frame.lineno - self.extra_lines,
frame.lineno + self.extra_lines,
),
highlight_lines={frame.lineno},
word_wrap=self.word_wrap,
code_width=code_width,
indent_guides=self.indent_guides,
dedent=False,
)
yield ""
except Exception as error: # pylint: disable=W0703
yield Text.assemble(
(f"\n{error}", "traceback.error"),
)
else:
yield (
Columns(
[
syntax,
*render_locals(frame),
],
padding=1,
)
if frame.locals
else syntax
)
class LogRender(DefaultLogRender):
@property
def last_time(self):
return self._last_time
@last_time.setter
def last_time(self, last_time):
self._last_time = last_time
def __init__(self, *args, **kwargs):
super(LogRender, self).__init__(*args, **kwargs)
self.show_level = True
self.time_format = config.logger.time_format
def __call__(
self,
console: "Console",
renderables: Iterable["ConsoleRenderable"],
log_time: Optional[datetime] = None,
time_format: Optional[Union[str, FormatTimeCallable]] = None,
level: TextType = "",
path: Optional[str] = None,
line_no: Optional[int] = None,
link_path: Optional[str] = None,
) -> "Table":
from rich.containers import Renderables
from rich.table import Table
output = Table.grid(padding=(0, 1))
output.expand = True
output.add_column(style="log.time")
output.add_column(style="log.level", width=self.level_width)
output.add_column(ratio=1, style="log.message", overflow="fold")
if path:
output.add_column(style="log.path")
if line_no:
output.add_column(style='log.line_no', width=4)
row: List["RenderableType"] = []
if self.show_time:
log_time = log_time or log_console.get_datetime()
time_format = time_format or self.time_format
if callable(time_format):
log_time_display = time_format(log_time)
else:
log_time_display = Text(log_time.strftime(time_format))
if log_time_display == self.last_time and self.omit_repeated_times:
row.append(Text(" " * len(log_time_display)))
else:
row.append(log_time_display)
self.last_time = log_time_display
if self.show_level:
row.append(level)
row.append(Renderables(renderables))
if path:
path_text = Text()
path_text.append(
path, style=f"link file://{link_path}" if link_path else ""
)
row.append(path_text)
line_no_text = Text()
line_no_text.append(
str(line_no), style=f"link file://{link_path}#{line_no}" if link_path else ""
)
row.append(line_no_text)
output.add_row(*row)
return output
class Handler(DefaultRichHandler):
def __init__(self, *args, **kwargs):
super(Handler, self).__init__(*args, **kwargs)
self._log_render = LogRender()
self.console = log_console
self.rich_tracebacks = True
self.tracebacks_show_locals = True
self.keywords = self.KEYWORDS + config.logger.render_keywords
def render(
self,
*,
record: "LogRecord",
traceback: Optional[Traceback],
message_renderable: Optional["ConsoleRenderable"],
) -> "ConsoleRenderable":
if record.pathname != '<input>':
try:
path = str(Path(record.pathname).relative_to(PROJECT_ROOT))
path = path.split('.')[0].replace(os.sep, '.')
except ValueError:
import site
path = None
for s in site.getsitepackages():
try:
path = str(Path(record.pathname).relative_to(Path(s)))
break
except ValueError:
continue
if path is None:
path = "<SITE>"
else:
path = path.split('.')[0].replace(os.sep, '.')
else:
path = '<INPUT>'
path = path.replace('lib.site-packages.', '')
_level = self.get_level_text(record)
time_format = None if self.formatter is None else self.formatter.datefmt
log_time = datetime.fromtimestamp(record.created)
log_renderable = self._log_render(
self.console,
(
[message_renderable]
if not traceback else
(
[message_renderable, traceback]
if message_renderable is not None else
[traceback]
)
),
log_time=log_time,
time_format=time_format,
level=_level,
path=path,
link_path=record.pathname if self.enable_link_path else None,
line_no=record.lineno,
)
return log_renderable
def render_message(
self, record: "LogRecord", message: Any
) -> "ConsoleRenderable":
use_markup = getattr(record, "markup", self.markup)
if isinstance(message, str):
message_text = (
Text.from_markup(message) if use_markup else Text(message)
)
highlighter = getattr(record, "highlighter", self.highlighter)
else:
from rich.highlighter import JSONHighlighter
from rich.json import JSON
highlighter = JSONHighlighter()
message_text = JSON.from_data(message, indent=4).text
if highlighter is not None:
# noinspection PyCallingNonCallable
message_text = highlighter(message_text)
if self.keywords is None:
self.keywords = self.KEYWORDS
if self.keywords:
message_text.highlight_words(self.keywords, "logging.keyword")
return message_text
def emit(self, record: "LogRecord") -> None:
message = self.format(record)
traceback = None
if (
self.rich_tracebacks
and record.exc_info
and record.exc_info != (None, None, None)
):
exc_type, exc_value, exc_traceback = record.exc_info
if exc_type is None or exc_value is None:
raise ValueError(record)
traceback = Traceback.from_exception(
exc_type,
exc_value,
exc_traceback,
width=self.tracebacks_width,
extra_lines=self.tracebacks_extra_lines,
word_wrap=self.tracebacks_word_wrap,
show_locals=self.tracebacks_show_locals,
locals_max_length=self.locals_max_length,
locals_max_string=self.locals_max_string,
suppress=self.tracebacks_suppress,
)
message = record.getMessage()
if self.formatter:
record.message = record.getMessage()
formatter = self.formatter
if hasattr(formatter, "usesTime") and formatter.usesTime():
record.asctime = formatter.formatTime(record, formatter.datefmt)
message = formatter.formatMessage(record)
if message == str(exc_value):
message = None
if message is not None:
try:
message_renderable = self.render_message(record, json.loads(message))
except JSONDecodeError:
message_renderable = self.render_message(record, message)
else:
message_renderable = None
log_renderable = self.render(record=record, traceback=traceback, message_renderable=message_renderable)
# noinspection PyBroadException
try:
self.console.print(log_renderable)
except Exception: # pylint: disable=W0703
self.handleError(record)
2022-09-10 14:46:08 +00:00
class FileHandler(Handler):
def __init__(self, *args, path: Path, **kwargs):
super().__init__(*args, **kwargs)
while True:
try:
path.parent.mkdir(exist_ok=True)
break
except FileNotFoundError:
parent = path.parent
while True:
try:
parent.mkdir(exist_ok=True)
break
except FileNotFoundError:
parent = parent.parent
path.parent.mkdir(exist_ok=True)
2022-09-10 14:46:08 +00:00
self.console = Console(width=180, file=FileIO(path), theme=Theme(DEFAULT_STYLE))
class Logger(logging.Logger):
def success(
self,
msg: Any, *args: Any,
exc_info: Optional[ExceptionInfoType] = None,
stack_info: bool = False,
stacklevel: int = 1,
extra: Optional[Mapping[str, Any]] = None
) -> None:
return self.log(25, msg, *args, exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra)
def exception(
self,
msg: Any = NOT_SET,
2022-09-10 14:46:08 +00:00
*args: Any,
exc_info: Optional[ExceptionInfoType] = True,
stack_info: bool = False,
stacklevel: int = 1,
extra: Optional[Mapping[str, Any]] = None,
**kwargs
) -> None:
super(Logger, self).exception(
"" if msg is NOT_SET else msg, *args,
exc_info=exc_info, stack_info=stack_info, stacklevel=stacklevel, extra=extra
)
with _lock:
if not __initialized__:
if "PYCHARM_HOSTED" in os.environ:
print() # 针对 pycharm 的控制台 bug
logging.captureWarnings(True)
2022-09-10 14:46:08 +00:00
handler, debug_handler, error_handler = (
Handler(locals_max_length=4),
FileHandler(level=10, path=config.logger.path.joinpath("debug/debug.log")),
FileHandler(level=40, path=config.logger.path.joinpath("error/error.log"))
2022-09-10 14:46:08 +00:00
)
level_ = 10 if config.debug else 20
logging.basicConfig(
level=10 if config.debug else 20,
format="%(message)s",
datefmt="[%Y-%m-%d %X]",
handlers=[handler, debug_handler, error_handler]
)
warnings_logger = logging.getLogger("py.warnings")
warnings_logger.addHandler(handler)
warnings_logger.addHandler(debug_handler)
logger = Logger('TGPaimon', level_)
logger.addHandler(handler)
logger.addHandler(debug_handler)
logger.addHandler(error_handler)
__initialized__ = True