vendor click.style instead of depending on click (#5188)

click introduces subdependencies like colorama, this approach is much simpler.
This commit is contained in:
Maximilian Hils 2022-03-16 18:02:53 +01:00 committed by GitHub
parent b5abbc97e3
commit ecd4790cbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 315 additions and 128 deletions

View File

@ -1,20 +1,21 @@
import itertools
import shutil
import sys
from typing import IO, Optional, Union
import click
from wsproto.frame_protocol import CloseReason
from mitmproxy import contentviews
from mitmproxy import ctx
from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import flowfilter
from mitmproxy import http
from mitmproxy import flow
from mitmproxy.contrib import click as miniclick
from mitmproxy.tcp import TCPFlow, TCPMessage
from mitmproxy.utils import human
from mitmproxy.utils import strutils
from mitmproxy.websocket import WebSocketMessage, WebSocketData
from wsproto.frame_protocol import CloseReason
from mitmproxy.websocket import WebSocketData, WebSocketMessage
def indent(n: int, text: str) -> str:
@ -23,16 +24,19 @@ def indent(n: int, text: str) -> str:
return "\n".join(pad + i for i in l)
def colorful(line, styles):
yield " " # we can already indent here
for (style, text) in line:
yield click.style(text, **styles.get(style, {}))
CONTENTVIEW_STYLES = {
"highlight": dict(bold=True),
"offset": dict(fg="blue"),
"header": dict(fg="green", bold=True),
"text": dict(fg="green"),
}
class Dumper:
def __init__(self, outfile=None):
def __init__(self, outfile: Optional[IO[str]] = None):
self.filter: Optional[flowfilter.TFilter] = None
self.outfp: Optional[IO] = outfile
self.outfp: IO[str] = outfile or sys.stdout
self.isatty = self.outfp.isatty()
def load(self, loader):
loader.add_option(
@ -66,29 +70,35 @@ class Dumper:
else:
self.filter = None
def style(self, text: str, **style) -> str:
if style and self.isatty:
text = miniclick.style(text, **style)
return text
def echo(self, text: str, ident=None, **style):
if ident:
text = indent(ident, text)
click.secho(text, file=self.outfp, err=False, **style)
if self.outfp:
self.outfp.flush()
text = self.style(text, **style)
print(text, file=self.outfp)
def _echo_headers(self, headers: http.Headers):
for k, v in headers.fields:
ks = strutils.bytes_to_escaped_str(k)
ks = self.style(ks, fg='blue')
vs = strutils.bytes_to_escaped_str(v)
out = "{}: {}".format(
click.style(ks, fg="blue"),
click.style(vs)
)
self.echo(out, ident=4)
self.echo(f"{ks}: {vs}", ident=4)
def _echo_trailers(self, trailers: Optional[http.Headers]):
if not trailers:
return
self.echo(click.style("--- HTTP Trailers", fg="magenta"), ident=4)
self.echo("--- HTTP Trailers", fg="magenta", ident=4)
self._echo_headers(trailers)
def _colorful(self, line):
yield " " # we can already indent here
for (style, text) in line:
yield self.style(text, **CONTENTVIEW_STYLES.get(style, {}))
def _echo_message(
self,
message: Union[http.Message, TCPMessage, WebSocketMessage],
@ -107,15 +117,8 @@ class Dumper:
else:
lines_to_echo = lines
styles = dict(
highlight=dict(bold=True),
offset=dict(fg="blue"),
header=dict(fg="green", bold=True),
text=dict(fg="green")
)
content = "\r\n".join(
"".join(colorful(line, styles)) for line in lines_to_echo
"".join(self._colorful(line)) for line in lines_to_echo
)
if content:
self.echo("")
@ -129,9 +132,9 @@ class Dumper:
def _echo_request_line(self, flow: http.HTTPFlow) -> None:
if flow.is_replay == "request":
client = click.style("[replay]", fg="yellow", bold=True)
client = self.style("[replay]", fg="yellow", bold=True)
elif flow.client_conn.peername:
client = click.style(
client = self.style(
strutils.escape_control_characters(
human.format_address(flow.client_conn.peername)
)
@ -146,7 +149,7 @@ class Dumper:
GET="green",
DELETE="red"
).get(method.upper(), "magenta")
method = click.style(
method = self.style(
strutils.escape_control_characters(method),
fg=method_color,
bold=True
@ -161,7 +164,7 @@ class Dumper:
terminal_width_limit = max(shutil.get_terminal_size()[0] - 25, 50)
if len(url) > terminal_width_limit:
url = url[:terminal_width_limit] + ""
url = click.style(strutils.escape_control_characters(url), bold=True)
url = self.style(strutils.escape_control_characters(url), bold=True)
http_version = ""
if (
@ -176,7 +179,7 @@ class Dumper:
def _echo_response_line(self, flow: http.HTTPFlow) -> None:
if flow.is_replay == "response":
replay_str = "[replay]"
replay = click.style(replay_str, fg="yellow", bold=True)
replay = self.style(replay_str, fg="yellow", bold=True)
else:
replay_str = ""
replay = ""
@ -190,7 +193,7 @@ class Dumper:
code_color = "magenta"
elif 400 <= code_int < 600:
code_color = "red"
code = click.style(
code = self.style(
str(code_int),
fg=code_color,
bold=True,
@ -201,7 +204,7 @@ class Dumper:
reason = flow.response.reason
else:
reason = http.status_codes.RESPONSES.get(flow.response.status_code, "")
reason = click.style(
reason = self.style(
strutils.escape_control_characters(reason),
fg=code_color,
bold=True
@ -211,7 +214,7 @@ class Dumper:
size = "(content missing)"
else:
size = human.pretty_size(len(flow.response.raw_content))
size = click.style(size, bold=True)
size = self.style(size, bold=True)
http_version = ""
if (
@ -221,7 +224,7 @@ class Dumper:
# Hide version for h1 <-> h1 connections.
http_version = f"{flow.response.http_version} "
arrows = click.style(" <<", bold=True)
arrows = self.style(" <<", bold=True)
if ctx.options.flow_detail == 1:
# This aligns the HTTP response code with the HTTP request method:
# 127.0.0.1:59519: GET http://example.com/
@ -255,6 +258,8 @@ class Dumper:
msg = strutils.escape_control_characters(f.error.msg)
self.echo(f" << {msg}", bold=True, fg="red")
self.outfp.flush()
def match(self, f):
if ctx.options.flow_detail == 0:
return False

View File

@ -1,14 +1,23 @@
import sys
from typing import IO, Optional
import click
from mitmproxy import log
from mitmproxy import ctx
from mitmproxy import log
from mitmproxy.contrib import click as miniclick
LOG_COLORS = {'error': "red", 'warn': "yellow", 'alert': "magenta"}
class TermLog:
def __init__(self, outfile=None):
self.outfile: Optional[IO] = outfile
def __init__(
self,
out: Optional[IO[str]] = None,
err: Optional[IO[str]] = None,
):
self.out_file: IO[str] = out or sys.stdout
self.out_isatty = self.out_file.isatty()
self.err_file: IO[str] = err or sys.stderr
self.err_isatty = self.err_file.isatty()
def load(self, loader):
loader.add_option(
@ -17,13 +26,20 @@ class TermLog:
choices=log.LogTierOrder
)
def add_log(self, e):
def add_log(self, e: log.LogEntry):
if log.log_tier(ctx.options.termlog_verbosity) >= log.log_tier(e.level):
click.secho(
e.msg,
file=self.outfile,
fg=dict(error="red", warn="yellow",
alert="magenta").get(e.level),
dim=(e.level == "debug"),
err=(e.level == "error")
)
if e.level == "error":
f = self.err_file
isatty = self.err_isatty
else:
f = self.out_file
isatty = self.out_isatty
msg = e.msg
if isatty:
msg = miniclick.style(
e.msg,
fg=LOG_COLORS.get(e.level),
dim=(e.level == "debug"),
)
print(msg, file=f)

View File

@ -0,0 +1,159 @@
"""
SPDX-License-Identifier: BSD-3-Clause
A vendored copy of click.style() @ 4f7b255
"""
import typing as t
_ansi_colors = {
"black": 30,
"red": 31,
"green": 32,
"yellow": 33,
"blue": 34,
"magenta": 35,
"cyan": 36,
"white": 37,
"reset": 39,
"bright_black": 90,
"bright_red": 91,
"bright_green": 92,
"bright_yellow": 93,
"bright_blue": 94,
"bright_magenta": 95,
"bright_cyan": 96,
"bright_white": 97,
}
_ansi_reset_all = "\033[0m"
def _interpret_color(
color: t.Union[int, t.Tuple[int, int, int], str], offset: int = 0
) -> str:
if isinstance(color, int):
return f"{38 + offset};5;{color:d}"
if isinstance(color, (tuple, list)):
r, g, b = color
return f"{38 + offset};2;{r:d};{g:d};{b:d}"
return str(_ansi_colors[color] + offset)
def style(
text: t.Any,
fg: t.Optional[t.Union[int, t.Tuple[int, int, int], str]] = None,
bg: t.Optional[t.Union[int, t.Tuple[int, int, int], str]] = None,
bold: t.Optional[bool] = None,
dim: t.Optional[bool] = None,
underline: t.Optional[bool] = None,
overline: t.Optional[bool] = None,
italic: t.Optional[bool] = None,
blink: t.Optional[bool] = None,
reverse: t.Optional[bool] = None,
strikethrough: t.Optional[bool] = None,
reset: bool = True,
) -> str:
"""Styles a text with ANSI styles and returns the new string. By
default the styling is self contained which means that at the end
of the string a reset code is issued. This can be prevented by
passing ``reset=False``.
Examples::
click.echo(click.style('Hello World!', fg='green'))
click.echo(click.style('ATTENTION!', blink=True))
click.echo(click.style('Some things', reverse=True, fg='cyan'))
click.echo(click.style('More colors', fg=(255, 12, 128), bg=117))
Supported color names:
* ``black`` (might be a gray)
* ``red``
* ``green``
* ``yellow`` (might be an orange)
* ``blue``
* ``magenta``
* ``cyan``
* ``white`` (might be light gray)
* ``bright_black``
* ``bright_red``
* ``bright_green``
* ``bright_yellow``
* ``bright_blue``
* ``bright_magenta``
* ``bright_cyan``
* ``bright_white``
* ``reset`` (reset the color code only)
If the terminal supports it, color may also be specified as:
- An integer in the interval [0, 255]. The terminal must support
8-bit/256-color mode.
- An RGB tuple of three integers in [0, 255]. The terminal must
support 24-bit/true-color mode.
See https://en.wikipedia.org/wiki/ANSI_color and
https://gist.github.com/XVilka/8346728 for more information.
:param text: the string to style with ansi codes.
:param fg: if provided this will become the foreground color.
:param bg: if provided this will become the background color.
:param bold: if provided this will enable or disable bold mode.
:param dim: if provided this will enable or disable dim mode. This is
badly supported.
:param underline: if provided this will enable or disable underline.
:param overline: if provided this will enable or disable overline.
:param italic: if provided this will enable or disable italic.
:param blink: if provided this will enable or disable blinking.
:param reverse: if provided this will enable or disable inverse
rendering (foreground becomes background and the
other way round).
:param strikethrough: if provided this will enable or disable
striking through text.
:param reset: by default a reset-all code is added at the end of the
string which means that styles do not carry over. This
can be disabled to compose styles.
.. versionchanged:: 8.0
A non-string ``message`` is converted to a string.
.. versionchanged:: 8.0
Added support for 256 and RGB color codes.
.. versionchanged:: 8.0
Added the ``strikethrough``, ``italic``, and ``overline``
parameters.
.. versionchanged:: 7.0
Added support for bright colors.
.. versionadded:: 2.0
"""
if not isinstance(text, str):
text = str(text)
bits = []
if fg:
try:
bits.append(f"\033[{_interpret_color(fg)}m")
except KeyError:
raise TypeError(f"Unknown color {fg!r}") from None
if bg:
try:
bits.append(f"\033[{_interpret_color(bg, 10)}m")
except KeyError:
raise TypeError(f"Unknown color {bg!r}") from None
if bold is not None:
bits.append(f"\033[{1 if bold else 22}m")
if dim is not None:
bits.append(f"\033[{2 if dim else 22}m")
if underline is not None:
bits.append(f"\033[{4 if underline else 24}m")
if overline is not None:
bits.append(f"\033[{53 if overline else 55}m")
if italic is not None:
bits.append(f"\033[{3 if italic else 23}m")
if blink is not None:
bits.append(f"\033[{5 if blink else 25}m")
if reverse is not None:
bits.append(f"\033[{7 if reverse else 27}m")
if strikethrough is not None:
bits.append(f"\033[{9 if strikethrough else 29}m")
bits.append(text)
if reset:
bits.append(_ansi_reset_all)
return "".join(bits)
__all__ = ["style"]

View File

@ -1,3 +1,5 @@
import collections
import collections.abc
import contextlib
import ctypes
import ctypes.wintypes
@ -11,13 +13,9 @@ import threading
import time
import typing
import click
import collections
import collections.abc
import pydivert
import pydivert.consts
REDIRECT_API_HOST = "127.0.0.1"
REDIRECT_API_PORT = 8085
@ -557,43 +555,42 @@ class TransparentProxy:
self.local.trusted_pids.remove(pid)
@click.group()
def cli():
pass
@cli.command()
@click.option("--local/--no-local", default=True,
help="Redirect the host's own traffic.")
@click.option("--forward/--no-forward", default=True,
help="Redirect traffic that's forwarded by the host.")
@click.option("--filter", type=str, metavar="WINDIVERT_FILTER",
help="Custom WinDivert interception rule.")
@click.option("-p", "--proxy-port", type=int, metavar="8080", default=8080,
help="The port mitmproxy is listening on.")
def redirect(**options):
"""Redirect flows to mitmproxy."""
proxy = TransparentProxy(**options)
proxy.start()
print(f" * Redirection active.")
print(f" Filter: {proxy.filter}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print(" * Shutting down...")
proxy.shutdown()
print(" * Shut down.")
@cli.command()
def connections():
"""List all TCP connections and the associated PIDs."""
connections = TcpConnectionTable()
connections.refresh()
for (ip, port), pid in connections.items():
print(f"{ip}:{port} -> {pid}")
if __name__ == "__main__":
import click
@click.group()
def cli():
pass
@cli.command()
@click.option("--local/--no-local", default=True,
help="Redirect the host's own traffic.")
@click.option("--forward/--no-forward", default=True,
help="Redirect traffic that's forwarded by the host.")
@click.option("--filter", type=str, metavar="WINDIVERT_FILTER",
help="Custom WinDivert interception rule.")
@click.option("-p", "--proxy-port", type=int, metavar="8080", default=8080,
help="The port mitmproxy is listening on.")
def redirect(**options):
"""Redirect flows to mitmproxy."""
proxy = TransparentProxy(**options)
proxy.start()
print(f" * Redirection active.")
print(f" Filter: {proxy.filter}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print(" * Shutting down...")
proxy.shutdown()
print(" * Shut down.")
@cli.command()
def connections():
"""List all TCP connections and the associated PIDs."""
connections = TcpConnectionTable()
connections.refresh()
for (ip, port), pid in connections.items():
print(f"{ip}:{port} -> {pid}")
cli()

View File

@ -49,7 +49,6 @@ exclude =
[tool:individual_coverage]
exclude =
mitmproxy/addons/onboarding.py
mitmproxy/addons/termlog.py
mitmproxy/connections.py
mitmproxy/contentviews/base.py
mitmproxy/contentviews/grpc.py

View File

@ -71,7 +71,6 @@ setup(
"blinker>=1.4, <1.5",
"Brotli>=1.0,<1.1",
"certifi>=2019.9.11", # no semver here - this should always be on the last release!
"click>=7.0,<8.1",
"cryptography>=36,<37",
"flask>=1.1.1,<2.1",
"h11>=0.11,<0.14",
@ -100,6 +99,7 @@ setup(
"pydivert>=2.0.3,<2.2",
],
'dev': [
"click>=7.0,<8.1",
"hypothesis>=5.8,<7",
"parver>=0.1,<2.0",
"pdoc>=4.0.0",

View File

@ -167,16 +167,15 @@ def test_echo_request_line():
sio.truncate(0)
class TestContentView:
async def test_contentview(self):
with mock.patch("mitmproxy.contentviews.auto.ViewAuto.__call__") as va:
va.side_effect = ValueError("")
sio = io.StringIO()
d = dumper.Dumper(sio)
with taddons.context(d) as tctx:
tctx.configure(d, flow_detail=4)
d.response(tflow.tflow())
await tctx.master.await_log("content viewer failed")
async def test_contentview():
with mock.patch("mitmproxy.contentviews.auto.ViewAuto.__call__") as va:
va.side_effect = ValueError("")
sio = io.StringIO()
d = dumper.Dumper(sio)
with taddons.context(d) as tctx:
tctx.configure(d, flow_detail=4)
d.response(tflow.tflow())
await tctx.master.await_log("content viewer failed")
def test_tcp():
@ -240,3 +239,13 @@ def test_http2():
f.response.http_version = b"HTTP/2.0"
d.response(f)
assert "HTTP/2.0 200 OK" in sio.getvalue()
def test_styling():
sio = io.StringIO()
sio.isatty = lambda: True
d = dumper.Dumper(sio)
with taddons.context(d):
d.response(tflow.tflow(resp=True))
assert "\x1b[" in sio.getvalue()

View File

@ -1,29 +1,31 @@
import sys
import pytest
import io
from mitmproxy.addons import termlog
from mitmproxy import log
from mitmproxy.addons import termlog
from mitmproxy.test import taddons
from test.conftest import skip_windows
class TestTermLog:
@skip_windows # not sure why this is suddenly necessary (03/2022)
@pytest.mark.usefixtures('capfd')
@pytest.mark.parametrize('outfile, expected_out, expected_err', [
(None, ['one', 'three'], ['four']),
(sys.stdout, ['one', 'three', 'four'], []),
(sys.stderr, [], ['one', 'three', 'four']),
])
def test_output(self, outfile, expected_out, expected_err, capfd):
t = termlog.TermLog(outfile=outfile)
with taddons.context(t) as tctx:
tctx.options.termlog_verbosity = "info"
tctx.configure(t)
t.add_log(log.LogEntry("one", "info"))
t.add_log(log.LogEntry("two", "debug"))
t.add_log(log.LogEntry("three", "warn"))
t.add_log(log.LogEntry("four", "error"))
out, err = capfd.readouterr()
assert out.strip().splitlines() == expected_out
assert err.strip().splitlines() == expected_err
def test_output(capsys):
t = termlog.TermLog()
with taddons.context(t) as tctx:
tctx.options.termlog_verbosity = "info"
tctx.configure(t)
t.add_log(log.LogEntry("one", "info"))
t.add_log(log.LogEntry("two", "debug"))
t.add_log(log.LogEntry("three", "warn"))
t.add_log(log.LogEntry("four", "error"))
out, err = capsys.readouterr()
assert out.strip().splitlines() == ["one", "three"]
assert err.strip().splitlines() == ["four"]
def test_styling() -> None:
f = io.StringIO()
f.isatty = lambda: True
t = termlog.TermLog(out=f)
with taddons.context(t) as tctx:
tctx.configure(t)
t.add_log(log.LogEntry("hello world", "info"))
assert f.getvalue() == "\x1b[22mhello world\x1b[0m\n"