diff --git a/mitmproxy/addons/dumper.py b/mitmproxy/addons/dumper.py index aeb4d2054..3f14032d7 100644 --- a/mitmproxy/addons/dumper.py +++ b/mitmproxy/addons/dumper.py @@ -1,20 +1,21 @@ import itertools import shutil +import sys from typing import IO, Optional, Union -import click +from wsproto.frame_protocol import CloseReason from mitmproxy import contentviews from mitmproxy import ctx from mitmproxy import exceptions +from mitmproxy import flow from mitmproxy import flowfilter from mitmproxy import http -from mitmproxy import flow +from mitmproxy.contrib import click as miniclick from mitmproxy.tcp import TCPFlow, TCPMessage from mitmproxy.utils import human from mitmproxy.utils import strutils -from mitmproxy.websocket import WebSocketMessage, WebSocketData -from wsproto.frame_protocol import CloseReason +from mitmproxy.websocket import WebSocketData, WebSocketMessage def indent(n: int, text: str) -> str: @@ -23,16 +24,19 @@ def indent(n: int, text: str) -> str: return "\n".join(pad + i for i in l) -def colorful(line, styles): - yield " " # we can already indent here - for (style, text) in line: - yield click.style(text, **styles.get(style, {})) +CONTENTVIEW_STYLES = { + "highlight": dict(bold=True), + "offset": dict(fg="blue"), + "header": dict(fg="green", bold=True), + "text": dict(fg="green"), +} class Dumper: - def __init__(self, outfile=None): + def __init__(self, outfile: Optional[IO[str]] = None): self.filter: Optional[flowfilter.TFilter] = None - self.outfp: Optional[IO] = outfile + self.outfp: IO[str] = outfile or sys.stdout + self.isatty = self.outfp.isatty() def load(self, loader): loader.add_option( @@ -66,29 +70,35 @@ class Dumper: else: self.filter = None + def style(self, text: str, **style) -> str: + if style and self.isatty: + text = miniclick.style(text, **style) + return text + def echo(self, text: str, ident=None, **style): if ident: text = indent(ident, text) - click.secho(text, file=self.outfp, err=False, **style) - if self.outfp: - self.outfp.flush() + text = self.style(text, **style) + print(text, file=self.outfp) def _echo_headers(self, headers: http.Headers): for k, v in headers.fields: ks = strutils.bytes_to_escaped_str(k) + ks = self.style(ks, fg='blue') vs = strutils.bytes_to_escaped_str(v) - out = "{}: {}".format( - click.style(ks, fg="blue"), - click.style(vs) - ) - self.echo(out, ident=4) + self.echo(f"{ks}: {vs}", ident=4) def _echo_trailers(self, trailers: Optional[http.Headers]): if not trailers: return - self.echo(click.style("--- HTTP Trailers", fg="magenta"), ident=4) + self.echo("--- HTTP Trailers", fg="magenta", ident=4) self._echo_headers(trailers) + def _colorful(self, line): + yield " " # we can already indent here + for (style, text) in line: + yield self.style(text, **CONTENTVIEW_STYLES.get(style, {})) + def _echo_message( self, message: Union[http.Message, TCPMessage, WebSocketMessage], @@ -107,15 +117,8 @@ class Dumper: else: lines_to_echo = lines - styles = dict( - highlight=dict(bold=True), - offset=dict(fg="blue"), - header=dict(fg="green", bold=True), - text=dict(fg="green") - ) - content = "\r\n".join( - "".join(colorful(line, styles)) for line in lines_to_echo + "".join(self._colorful(line)) for line in lines_to_echo ) if content: self.echo("") @@ -129,9 +132,9 @@ class Dumper: def _echo_request_line(self, flow: http.HTTPFlow) -> None: if flow.is_replay == "request": - client = click.style("[replay]", fg="yellow", bold=True) + client = self.style("[replay]", fg="yellow", bold=True) elif flow.client_conn.peername: - client = click.style( + client = self.style( strutils.escape_control_characters( human.format_address(flow.client_conn.peername) ) @@ -146,7 +149,7 @@ class Dumper: GET="green", DELETE="red" ).get(method.upper(), "magenta") - method = click.style( + method = self.style( strutils.escape_control_characters(method), fg=method_color, bold=True @@ -161,7 +164,7 @@ class Dumper: terminal_width_limit = max(shutil.get_terminal_size()[0] - 25, 50) if len(url) > terminal_width_limit: url = url[:terminal_width_limit] + "…" - url = click.style(strutils.escape_control_characters(url), bold=True) + url = self.style(strutils.escape_control_characters(url), bold=True) http_version = "" if ( @@ -176,7 +179,7 @@ class Dumper: def _echo_response_line(self, flow: http.HTTPFlow) -> None: if flow.is_replay == "response": replay_str = "[replay]" - replay = click.style(replay_str, fg="yellow", bold=True) + replay = self.style(replay_str, fg="yellow", bold=True) else: replay_str = "" replay = "" @@ -190,7 +193,7 @@ class Dumper: code_color = "magenta" elif 400 <= code_int < 600: code_color = "red" - code = click.style( + code = self.style( str(code_int), fg=code_color, bold=True, @@ -201,7 +204,7 @@ class Dumper: reason = flow.response.reason else: reason = http.status_codes.RESPONSES.get(flow.response.status_code, "") - reason = click.style( + reason = self.style( strutils.escape_control_characters(reason), fg=code_color, bold=True @@ -211,7 +214,7 @@ class Dumper: size = "(content missing)" else: size = human.pretty_size(len(flow.response.raw_content)) - size = click.style(size, bold=True) + size = self.style(size, bold=True) http_version = "" if ( @@ -221,7 +224,7 @@ class Dumper: # Hide version for h1 <-> h1 connections. http_version = f"{flow.response.http_version} " - arrows = click.style(" <<", bold=True) + arrows = self.style(" <<", bold=True) if ctx.options.flow_detail == 1: # This aligns the HTTP response code with the HTTP request method: # 127.0.0.1:59519: GET http://example.com/ @@ -255,6 +258,8 @@ class Dumper: msg = strutils.escape_control_characters(f.error.msg) self.echo(f" << {msg}", bold=True, fg="red") + self.outfp.flush() + def match(self, f): if ctx.options.flow_detail == 0: return False diff --git a/mitmproxy/addons/termlog.py b/mitmproxy/addons/termlog.py index b4ac300d8..b5a3ed3f5 100644 --- a/mitmproxy/addons/termlog.py +++ b/mitmproxy/addons/termlog.py @@ -1,14 +1,23 @@ +import sys from typing import IO, Optional -import click - -from mitmproxy import log from mitmproxy import ctx +from mitmproxy import log +from mitmproxy.contrib import click as miniclick + +LOG_COLORS = {'error': "red", 'warn': "yellow", 'alert': "magenta"} class TermLog: - def __init__(self, outfile=None): - self.outfile: Optional[IO] = outfile + def __init__( + self, + out: Optional[IO[str]] = None, + err: Optional[IO[str]] = None, + ): + self.out_file: IO[str] = out or sys.stdout + self.out_isatty = self.out_file.isatty() + self.err_file: IO[str] = err or sys.stderr + self.err_isatty = self.err_file.isatty() def load(self, loader): loader.add_option( @@ -17,13 +26,20 @@ class TermLog: choices=log.LogTierOrder ) - def add_log(self, e): + def add_log(self, e: log.LogEntry): if log.log_tier(ctx.options.termlog_verbosity) >= log.log_tier(e.level): - click.secho( - e.msg, - file=self.outfile, - fg=dict(error="red", warn="yellow", - alert="magenta").get(e.level), - dim=(e.level == "debug"), - err=(e.level == "error") - ) + if e.level == "error": + f = self.err_file + isatty = self.err_isatty + else: + f = self.out_file + isatty = self.out_isatty + + msg = e.msg + if isatty: + msg = miniclick.style( + e.msg, + fg=LOG_COLORS.get(e.level), + dim=(e.level == "debug"), + ) + print(msg, file=f) diff --git a/mitmproxy/contrib/click/__init__.py b/mitmproxy/contrib/click/__init__.py new file mode 100644 index 000000000..f32d0d964 --- /dev/null +++ b/mitmproxy/contrib/click/__init__.py @@ -0,0 +1,159 @@ +""" +SPDX-License-Identifier: BSD-3-Clause + +A vendored copy of click.style() @ 4f7b255 +""" +import typing as t + +_ansi_colors = { + "black": 30, + "red": 31, + "green": 32, + "yellow": 33, + "blue": 34, + "magenta": 35, + "cyan": 36, + "white": 37, + "reset": 39, + "bright_black": 90, + "bright_red": 91, + "bright_green": 92, + "bright_yellow": 93, + "bright_blue": 94, + "bright_magenta": 95, + "bright_cyan": 96, + "bright_white": 97, +} +_ansi_reset_all = "\033[0m" + + +def _interpret_color( + color: t.Union[int, t.Tuple[int, int, int], str], offset: int = 0 +) -> str: + if isinstance(color, int): + return f"{38 + offset};5;{color:d}" + + if isinstance(color, (tuple, list)): + r, g, b = color + return f"{38 + offset};2;{r:d};{g:d};{b:d}" + + return str(_ansi_colors[color] + offset) + + +def style( + text: t.Any, + fg: t.Optional[t.Union[int, t.Tuple[int, int, int], str]] = None, + bg: t.Optional[t.Union[int, t.Tuple[int, int, int], str]] = None, + bold: t.Optional[bool] = None, + dim: t.Optional[bool] = None, + underline: t.Optional[bool] = None, + overline: t.Optional[bool] = None, + italic: t.Optional[bool] = None, + blink: t.Optional[bool] = None, + reverse: t.Optional[bool] = None, + strikethrough: t.Optional[bool] = None, + reset: bool = True, +) -> str: + """Styles a text with ANSI styles and returns the new string. By + default the styling is self contained which means that at the end + of the string a reset code is issued. This can be prevented by + passing ``reset=False``. + Examples:: + click.echo(click.style('Hello World!', fg='green')) + click.echo(click.style('ATTENTION!', blink=True)) + click.echo(click.style('Some things', reverse=True, fg='cyan')) + click.echo(click.style('More colors', fg=(255, 12, 128), bg=117)) + Supported color names: + * ``black`` (might be a gray) + * ``red`` + * ``green`` + * ``yellow`` (might be an orange) + * ``blue`` + * ``magenta`` + * ``cyan`` + * ``white`` (might be light gray) + * ``bright_black`` + * ``bright_red`` + * ``bright_green`` + * ``bright_yellow`` + * ``bright_blue`` + * ``bright_magenta`` + * ``bright_cyan`` + * ``bright_white`` + * ``reset`` (reset the color code only) + If the terminal supports it, color may also be specified as: + - An integer in the interval [0, 255]. The terminal must support + 8-bit/256-color mode. + - An RGB tuple of three integers in [0, 255]. The terminal must + support 24-bit/true-color mode. + See https://en.wikipedia.org/wiki/ANSI_color and + https://gist.github.com/XVilka/8346728 for more information. + :param text: the string to style with ansi codes. + :param fg: if provided this will become the foreground color. + :param bg: if provided this will become the background color. + :param bold: if provided this will enable or disable bold mode. + :param dim: if provided this will enable or disable dim mode. This is + badly supported. + :param underline: if provided this will enable or disable underline. + :param overline: if provided this will enable or disable overline. + :param italic: if provided this will enable or disable italic. + :param blink: if provided this will enable or disable blinking. + :param reverse: if provided this will enable or disable inverse + rendering (foreground becomes background and the + other way round). + :param strikethrough: if provided this will enable or disable + striking through text. + :param reset: by default a reset-all code is added at the end of the + string which means that styles do not carry over. This + can be disabled to compose styles. + .. versionchanged:: 8.0 + A non-string ``message`` is converted to a string. + .. versionchanged:: 8.0 + Added support for 256 and RGB color codes. + .. versionchanged:: 8.0 + Added the ``strikethrough``, ``italic``, and ``overline`` + parameters. + .. versionchanged:: 7.0 + Added support for bright colors. + .. versionadded:: 2.0 + """ + if not isinstance(text, str): + text = str(text) + + bits = [] + + if fg: + try: + bits.append(f"\033[{_interpret_color(fg)}m") + except KeyError: + raise TypeError(f"Unknown color {fg!r}") from None + + if bg: + try: + bits.append(f"\033[{_interpret_color(bg, 10)}m") + except KeyError: + raise TypeError(f"Unknown color {bg!r}") from None + + if bold is not None: + bits.append(f"\033[{1 if bold else 22}m") + if dim is not None: + bits.append(f"\033[{2 if dim else 22}m") + if underline is not None: + bits.append(f"\033[{4 if underline else 24}m") + if overline is not None: + bits.append(f"\033[{53 if overline else 55}m") + if italic is not None: + bits.append(f"\033[{3 if italic else 23}m") + if blink is not None: + bits.append(f"\033[{5 if blink else 25}m") + if reverse is not None: + bits.append(f"\033[{7 if reverse else 27}m") + if strikethrough is not None: + bits.append(f"\033[{9 if strikethrough else 29}m") + bits.append(text) + if reset: + bits.append(_ansi_reset_all) + return "".join(bits) + + +__all__ = ["style"] diff --git a/mitmproxy/platform/windows.py b/mitmproxy/platform/windows.py index a9cc2c73c..486d655e4 100644 --- a/mitmproxy/platform/windows.py +++ b/mitmproxy/platform/windows.py @@ -1,3 +1,5 @@ +import collections +import collections.abc import contextlib import ctypes import ctypes.wintypes @@ -11,13 +13,9 @@ import threading import time import typing -import click -import collections -import collections.abc import pydivert import pydivert.consts - REDIRECT_API_HOST = "127.0.0.1" REDIRECT_API_PORT = 8085 @@ -557,43 +555,42 @@ class TransparentProxy: self.local.trusted_pids.remove(pid) -@click.group() -def cli(): - pass - - -@cli.command() -@click.option("--local/--no-local", default=True, - help="Redirect the host's own traffic.") -@click.option("--forward/--no-forward", default=True, - help="Redirect traffic that's forwarded by the host.") -@click.option("--filter", type=str, metavar="WINDIVERT_FILTER", - help="Custom WinDivert interception rule.") -@click.option("-p", "--proxy-port", type=int, metavar="8080", default=8080, - help="The port mitmproxy is listening on.") -def redirect(**options): - """Redirect flows to mitmproxy.""" - proxy = TransparentProxy(**options) - proxy.start() - print(f" * Redirection active.") - print(f" Filter: {proxy.filter}") - try: - while True: - time.sleep(1) - except KeyboardInterrupt: - print(" * Shutting down...") - proxy.shutdown() - print(" * Shut down.") - - -@cli.command() -def connections(): - """List all TCP connections and the associated PIDs.""" - connections = TcpConnectionTable() - connections.refresh() - for (ip, port), pid in connections.items(): - print(f"{ip}:{port} -> {pid}") - - if __name__ == "__main__": + import click + + @click.group() + def cli(): + pass + + @cli.command() + @click.option("--local/--no-local", default=True, + help="Redirect the host's own traffic.") + @click.option("--forward/--no-forward", default=True, + help="Redirect traffic that's forwarded by the host.") + @click.option("--filter", type=str, metavar="WINDIVERT_FILTER", + help="Custom WinDivert interception rule.") + @click.option("-p", "--proxy-port", type=int, metavar="8080", default=8080, + help="The port mitmproxy is listening on.") + def redirect(**options): + """Redirect flows to mitmproxy.""" + proxy = TransparentProxy(**options) + proxy.start() + print(f" * Redirection active.") + print(f" Filter: {proxy.filter}") + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print(" * Shutting down...") + proxy.shutdown() + print(" * Shut down.") + + @cli.command() + def connections(): + """List all TCP connections and the associated PIDs.""" + connections = TcpConnectionTable() + connections.refresh() + for (ip, port), pid in connections.items(): + print(f"{ip}:{port} -> {pid}") + cli() diff --git a/setup.cfg b/setup.cfg index f3da275af..746ff90a5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -49,7 +49,6 @@ exclude = [tool:individual_coverage] exclude = mitmproxy/addons/onboarding.py - mitmproxy/addons/termlog.py mitmproxy/connections.py mitmproxy/contentviews/base.py mitmproxy/contentviews/grpc.py diff --git a/setup.py b/setup.py index 23a1d1d13..acba8f142 100644 --- a/setup.py +++ b/setup.py @@ -71,7 +71,6 @@ setup( "blinker>=1.4, <1.5", "Brotli>=1.0,<1.1", "certifi>=2019.9.11", # no semver here - this should always be on the last release! - "click>=7.0,<8.1", "cryptography>=36,<37", "flask>=1.1.1,<2.1", "h11>=0.11,<0.14", @@ -100,6 +99,7 @@ setup( "pydivert>=2.0.3,<2.2", ], 'dev': [ + "click>=7.0,<8.1", "hypothesis>=5.8,<7", "parver>=0.1,<2.0", "pdoc>=4.0.0", diff --git a/test/mitmproxy/addons/test_dumper.py b/test/mitmproxy/addons/test_dumper.py index c65afc50b..c046a1ea0 100644 --- a/test/mitmproxy/addons/test_dumper.py +++ b/test/mitmproxy/addons/test_dumper.py @@ -167,16 +167,15 @@ def test_echo_request_line(): sio.truncate(0) -class TestContentView: - async def test_contentview(self): - with mock.patch("mitmproxy.contentviews.auto.ViewAuto.__call__") as va: - va.side_effect = ValueError("") - sio = io.StringIO() - d = dumper.Dumper(sio) - with taddons.context(d) as tctx: - tctx.configure(d, flow_detail=4) - d.response(tflow.tflow()) - await tctx.master.await_log("content viewer failed") +async def test_contentview(): + with mock.patch("mitmproxy.contentviews.auto.ViewAuto.__call__") as va: + va.side_effect = ValueError("") + sio = io.StringIO() + d = dumper.Dumper(sio) + with taddons.context(d) as tctx: + tctx.configure(d, flow_detail=4) + d.response(tflow.tflow()) + await tctx.master.await_log("content viewer failed") def test_tcp(): @@ -240,3 +239,13 @@ def test_http2(): f.response.http_version = b"HTTP/2.0" d.response(f) assert "HTTP/2.0 200 OK" in sio.getvalue() + + +def test_styling(): + sio = io.StringIO() + sio.isatty = lambda: True + + d = dumper.Dumper(sio) + with taddons.context(d): + d.response(tflow.tflow(resp=True)) + assert "\x1b[" in sio.getvalue() diff --git a/test/mitmproxy/addons/test_termlog.py b/test/mitmproxy/addons/test_termlog.py index c245bd110..6441f3a64 100644 --- a/test/mitmproxy/addons/test_termlog.py +++ b/test/mitmproxy/addons/test_termlog.py @@ -1,29 +1,31 @@ -import sys -import pytest +import io -from mitmproxy.addons import termlog from mitmproxy import log +from mitmproxy.addons import termlog from mitmproxy.test import taddons -from test.conftest import skip_windows -class TestTermLog: - @skip_windows # not sure why this is suddenly necessary (03/2022) - @pytest.mark.usefixtures('capfd') - @pytest.mark.parametrize('outfile, expected_out, expected_err', [ - (None, ['one', 'three'], ['four']), - (sys.stdout, ['one', 'three', 'four'], []), - (sys.stderr, [], ['one', 'three', 'four']), - ]) - def test_output(self, outfile, expected_out, expected_err, capfd): - t = termlog.TermLog(outfile=outfile) - with taddons.context(t) as tctx: - tctx.options.termlog_verbosity = "info" - tctx.configure(t) - t.add_log(log.LogEntry("one", "info")) - t.add_log(log.LogEntry("two", "debug")) - t.add_log(log.LogEntry("three", "warn")) - t.add_log(log.LogEntry("four", "error")) - out, err = capfd.readouterr() - assert out.strip().splitlines() == expected_out - assert err.strip().splitlines() == expected_err +def test_output(capsys): + t = termlog.TermLog() + with taddons.context(t) as tctx: + tctx.options.termlog_verbosity = "info" + tctx.configure(t) + t.add_log(log.LogEntry("one", "info")) + t.add_log(log.LogEntry("two", "debug")) + t.add_log(log.LogEntry("three", "warn")) + t.add_log(log.LogEntry("four", "error")) + out, err = capsys.readouterr() + assert out.strip().splitlines() == ["one", "three"] + assert err.strip().splitlines() == ["four"] + + +def test_styling() -> None: + f = io.StringIO() + f.isatty = lambda: True + t = termlog.TermLog(out=f) + + with taddons.context(t) as tctx: + tctx.configure(t) + t.add_log(log.LogEntry("hello world", "info")) + + assert f.getvalue() == "\x1b[22mhello world\x1b[0m\n"