diff --git a/mitmproxy/addons/dumper.py b/mitmproxy/addons/dumper.py index 3f14032d7..726a0edf5 100644 --- a/mitmproxy/addons/dumper.py +++ b/mitmproxy/addons/dumper.py @@ -15,6 +15,7 @@ from mitmproxy.contrib import click as miniclick from mitmproxy.tcp import TCPFlow, TCPMessage from mitmproxy.utils import human from mitmproxy.utils import strutils +from mitmproxy.utils import vt_codes from mitmproxy.websocket import WebSocketData, WebSocketMessage @@ -36,7 +37,7 @@ class Dumper: def __init__(self, outfile: Optional[IO[str]] = None): self.filter: Optional[flowfilter.TFilter] = None self.outfp: IO[str] = outfile or sys.stdout - self.isatty = self.outfp.isatty() + self.out_has_vt_codes = vt_codes.ensure_supported(self.outfp) def load(self, loader): loader.add_option( @@ -71,7 +72,7 @@ class Dumper: self.filter = None def style(self, text: str, **style) -> str: - if style and self.isatty: + if style and self.out_has_vt_codes: text = miniclick.style(text, **style) return text diff --git a/mitmproxy/addons/termlog.py b/mitmproxy/addons/termlog.py index b5a3ed3f5..d6d134ddf 100644 --- a/mitmproxy/addons/termlog.py +++ b/mitmproxy/addons/termlog.py @@ -4,6 +4,7 @@ from typing import IO, Optional from mitmproxy import ctx from mitmproxy import log from mitmproxy.contrib import click as miniclick +from mitmproxy.utils import vt_codes LOG_COLORS = {'error': "red", 'warn': "yellow", 'alert': "magenta"} @@ -15,9 +16,9 @@ class TermLog: err: Optional[IO[str]] = None, ): self.out_file: IO[str] = out or sys.stdout - self.out_isatty = self.out_file.isatty() + self.out_has_vt_codes = vt_codes.ensure_supported(self.out_file) self.err_file: IO[str] = err or sys.stderr - self.err_isatty = self.err_file.isatty() + self.err_has_vt_codes = vt_codes.ensure_supported(self.err_file) def load(self, loader): loader.add_option( @@ -30,13 +31,13 @@ class TermLog: if log.log_tier(ctx.options.termlog_verbosity) >= log.log_tier(e.level): if e.level == "error": f = self.err_file - isatty = self.err_isatty + has_vt_codes = self.err_has_vt_codes else: f = self.out_file - isatty = self.out_isatty + has_vt_codes = self.out_has_vt_codes msg = e.msg - if isatty: + if has_vt_codes: msg = miniclick.style( e.msg, fg=LOG_COLORS.get(e.level), diff --git a/mitmproxy/contrib/urwid/win32.py b/mitmproxy/contrib/urwid/win32.py index 8724f5fda..581fcdfd2 100644 --- a/mitmproxy/contrib/urwid/win32.py +++ b/mitmproxy/contrib/urwid/win32.py @@ -4,6 +4,7 @@ from ctypes.wintypes import BOOL, DWORD, WCHAR, WORD, SHORT, UINT, HANDLE, LPDWO # https://docs.microsoft.com/de-de/windows/console/getstdhandle STD_INPUT_HANDLE = -10 STD_OUTPUT_HANDLE = -11 +STD_ERROR_HANDLE = -12 # https://docs.microsoft.com/de-de/windows/console/setconsolemode ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 diff --git a/mitmproxy/utils/vt_codes.py b/mitmproxy/utils/vt_codes.py new file mode 100644 index 000000000..591b72e33 --- /dev/null +++ b/mitmproxy/utils/vt_codes.py @@ -0,0 +1,51 @@ +""" +This module provides a method to detect if a given file object supports virtual terminal escape codes. +""" +import os +import sys +from typing import IO + +if os.name == "nt": + from ctypes import byref, windll # type: ignore + from ctypes.wintypes import BOOL, DWORD, HANDLE, LPDWORD + + ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 + STD_OUTPUT_HANDLE = -11 + STD_ERROR_HANDLE = -12 + + # https://docs.microsoft.com/de-de/windows/console/getstdhandle + GetStdHandle = windll.kernel32.GetStdHandle + GetStdHandle.argtypes = [DWORD] + GetStdHandle.restype = HANDLE + + # https://docs.microsoft.com/de-de/windows/console/getconsolemode + GetConsoleMode = windll.kernel32.GetConsoleMode + GetConsoleMode.argtypes = [HANDLE, LPDWORD] + GetConsoleMode.restype = BOOL + + # https://docs.microsoft.com/de-de/windows/console/setconsolemode + SetConsoleMode = windll.kernel32.SetConsoleMode + SetConsoleMode.argtypes = [HANDLE, DWORD] + SetConsoleMode.restype = BOOL + + def ensure_supported(f: IO[str]) -> bool: + if not f.isatty(): + return False + if f == sys.stdout: + h = STD_OUTPUT_HANDLE + elif f == sys.stderr: + h = STD_ERROR_HANDLE + else: + return False + + handle = GetStdHandle(h) + console_mode = DWORD() + ok = GetConsoleMode(handle, byref(console_mode)) + if not ok: + return False + + ok = SetConsoleMode(handle, console_mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING) + return ok +else: + def ensure_supported(f: IO[str]) -> bool: + return f.isatty() diff --git a/setup.cfg b/setup.cfg index 746ff90a5..1d38e5198 100644 --- a/setup.cfg +++ b/setup.cfg @@ -70,4 +70,5 @@ exclude = mitmproxy/proxy/server.py mitmproxy/proxy/layers/tls.py mitmproxy/utils/bits.py + mitmproxy/utils/vt_codes.py release/hooks diff --git a/test/mitmproxy/addons/test_dumper.py b/test/mitmproxy/addons/test_dumper.py index c046a1ea0..c8879eee0 100644 --- a/test/mitmproxy/addons/test_dumper.py +++ b/test/mitmproxy/addons/test_dumper.py @@ -243,9 +243,9 @@ def test_http2(): def test_styling(): sio = io.StringIO() - sio.isatty = lambda: True d = dumper.Dumper(sio) + d.out_has_vt_codes = True with taddons.context(d): d.response(tflow.tflow(resp=True)) assert "\x1b[" in sio.getvalue() diff --git a/test/mitmproxy/addons/test_termlog.py b/test/mitmproxy/addons/test_termlog.py index 6441f3a64..e6ea67eff 100644 --- a/test/mitmproxy/addons/test_termlog.py +++ b/test/mitmproxy/addons/test_termlog.py @@ -19,11 +19,10 @@ def test_output(capsys): assert err.strip().splitlines() == ["four"] -def test_styling() -> None: +def test_styling(monkeypatch) -> None: f = io.StringIO() - f.isatty = lambda: True t = termlog.TermLog(out=f) - + t.out_has_vt_codes = True with taddons.context(t) as tctx: tctx.configure(t) t.add_log(log.LogEntry("hello world", "info")) diff --git a/test/mitmproxy/utils/test_vt_codes.py b/test/mitmproxy/utils/test_vt_codes.py new file mode 100644 index 000000000..d8ac9cc42 --- /dev/null +++ b/test/mitmproxy/utils/test_vt_codes.py @@ -0,0 +1,7 @@ +import io + +from mitmproxy.utils.vt_codes import ensure_supported + + +def test_simple(): + assert not ensure_supported(io.StringIO())