diff --git a/mitmproxy/tools/console/common.py b/mitmproxy/tools/console/common.py index 1ef1b751c..8a8427995 100644 --- a/mitmproxy/tools/console/common.py +++ b/mitmproxy/tools/console/common.py @@ -1,9 +1,10 @@ import platform +import typing +from functools import lru_cache import urwid import urwid.util -from functools import lru_cache from mitmproxy.utils import human # Detect Windows Subsystem for Linux @@ -43,41 +44,48 @@ def highlight_key(str, key, textattr="text", keyattr="key"): KEY_MAX = 30 -def format_keyvals(lst, key="key", val="text", indent=0): +def format_keyvals( + entries: typing.List[typing.Tuple[str, typing.Union[None, str, urwid.Widget]]], + key_format: str = "key", + value_format: str = "text", + indent: int = 0 +) -> typing.List[urwid.Columns]: """ - Format a list of (key, value) tuples. + Format a list of (key, value) tuples. - If key is None, it's treated specially: - - We assume a sub-value, and add an extra indent. - - The value is treated as a pre-formatted list of directives. + Args: + entries: The list to format. keys must be strings, values can also be None or urwid widgets. + The latter makes it possible to use the result of format_keyvals() as a value. + key_format: The display attribute for the key. + value_format: The display attribute for the value. + indent: Additional indent to apply. """ + max_key_len = max((len(k) for k, v in entries if k is not None), default=0) + max_key_len = min(max_key_len, KEY_MAX) + + if indent > 2: + indent -= 2 # We use dividechars=2 below, which already adds two empty spaces + ret = [] - if lst: - maxk = min(max(len(i[0]) for i in lst if i and i[0]), KEY_MAX) - for i, kv in enumerate(lst): - if kv is None: - ret.append(urwid.Text("")) - else: - if isinstance(kv[1], urwid.Widget): - v = kv[1] - elif kv[1] is None: - v = urwid.Text("") - else: - v = urwid.Text([(val, kv[1])]) - ret.append( - urwid.Columns( - [ - ("fixed", indent, urwid.Text("")), - ( - "fixed", - maxk, - urwid.Text([(key, kv[0] or "")]) - ), - v - ], - dividechars = 2 - ) - ) + for k, v in entries: + if v is None: + v = urwid.Text("") + elif not isinstance(v, urwid.Widget): + v = urwid.Text([(value_format, v)]) + ret.append( + urwid.Columns( + [ + ("fixed", indent, urwid.Text("")), + ( + "fixed", + max_key_len, + urwid.Text([(key_format, k)]) + ), + v + ], + dividechars=2 + ) + ) return ret diff --git a/mitmproxy/tools/console/flowdetailview.py b/mitmproxy/tools/console/flowdetailview.py index 28fe1fbc0..32ac4b605 100644 --- a/mitmproxy/tools/console/flowdetailview.py +++ b/mitmproxy/tools/console/flowdetailview.py @@ -23,157 +23,157 @@ def flowdetails(state, flow: http.HTTPFlow): metadata = flow.metadata if metadata is not None and len(metadata) > 0: - parts = [[str(k), repr(v)] for k, v in metadata.items()] + parts = [(str(k), repr(v)) for k, v in metadata.items()] text.append(urwid.Text([("head", "Metadata:")])) - text.extend(common.format_keyvals(parts, key="key", val="text", indent=4)) + text.extend(common.format_keyvals(parts, indent=4)) if sc is not None and sc.ip_address: text.append(urwid.Text([("head", "Server Connection:")])) parts = [ - ["Address", human.format_address(sc.address)], + ("Address", human.format_address(sc.address)), ] if sc.ip_address: - parts.append(["Resolved Address", human.format_address(sc.ip_address)]) + parts.append(("Resolved Address", human.format_address(sc.ip_address))) if resp: - parts.append(["HTTP Version", resp.http_version]) + parts.append(("HTTP Version", resp.http_version)) if sc.alpn_proto_negotiated: - parts.append(["ALPN", sc.alpn_proto_negotiated]) + parts.append(("ALPN", sc.alpn_proto_negotiated)) text.extend( - common.format_keyvals(parts, key="key", val="text", indent=4) + common.format_keyvals(parts, indent=4) ) c = sc.cert if c: text.append(urwid.Text([("head", "Server Certificate:")])) parts = [ - ["Type", "%s, %s bits" % c.keyinfo], - ["SHA1 digest", c.digest("sha1")], - ["Valid to", str(c.notafter)], - ["Valid from", str(c.notbefore)], - ["Serial", str(c.serial)], - [ + ("Type", "%s, %s bits" % c.keyinfo), + ("SHA1 digest", c.digest("sha1")), + ("Valid to", str(c.notafter)), + ("Valid from", str(c.notbefore)), + ("Serial", str(c.serial)), + ( "Subject", urwid.BoxAdapter( urwid.ListBox( common.format_keyvals( c.subject, - key="highlight", - val="text" + key_format="highlight" ) ), len(c.subject) ) - ], - [ + ), + ( "Issuer", urwid.BoxAdapter( urwid.ListBox( common.format_keyvals( - c.issuer, key="highlight", val="text" + c.issuer, + key_format="highlight" ) ), len(c.issuer) ) - ] + ) ] if c.altnames: parts.append( - [ + ( "Alt names", ", ".join(strutils.bytes_to_escaped_str(x) for x in c.altnames) - ] + ) ) text.extend( - common.format_keyvals(parts, key="key", val="text", indent=4) + common.format_keyvals(parts, indent=4) ) if cc is not None: text.append(urwid.Text([("head", "Client Connection:")])) parts = [ - ["Address", "{}:{}".format(cc.address[0], cc.address[1])], + ("Address", "{}:{}".format(cc.address[0], cc.address[1])), ] if req: - parts.append(["HTTP Version", req.http_version]) + parts.append(("HTTP Version", req.http_version)) if cc.tls_version: - parts.append(["TLS Version", cc.tls_version]) + parts.append(("TLS Version", cc.tls_version)) if cc.sni: - parts.append(["Server Name Indication", cc.sni]) + parts.append(("Server Name Indication", cc.sni)) if cc.cipher_name: - parts.append(["Cipher Name", cc.cipher_name]) + parts.append(("Cipher Name", cc.cipher_name)) if cc.alpn_proto_negotiated: - parts.append(["ALPN", cc.alpn_proto_negotiated]) + parts.append(("ALPN", cc.alpn_proto_negotiated)) text.extend( - common.format_keyvals(parts, key="key", val="text", indent=4) + common.format_keyvals(parts, indent=4) ) parts = [] if cc is not None and cc.timestamp_start: parts.append( - [ + ( "Client conn. established", maybe_timestamp(cc, "timestamp_start") - ] + ) ) if cc.ssl_established: parts.append( - [ + ( "Client conn. TLS handshake", maybe_timestamp(cc, "timestamp_ssl_setup") - ] + ) ) if sc is not None and sc.timestamp_start: parts.append( - [ + ( "Server conn. initiated", maybe_timestamp(sc, "timestamp_start") - ] + ) ) parts.append( - [ + ( "Server conn. TCP handshake", maybe_timestamp(sc, "timestamp_tcp_setup") - ] + ) ) if sc.ssl_established: parts.append( - [ + ( "Server conn. TLS handshake", maybe_timestamp(sc, "timestamp_ssl_setup") - ] + ) ) if req is not None and req.timestamp_start: parts.append( - [ + ( "First request byte", maybe_timestamp(req, "timestamp_start") - ] + ) ) parts.append( - [ + ( "Request complete", maybe_timestamp(req, "timestamp_end") - ] + ) ) if resp is not None and resp.timestamp_start: parts.append( - [ + ( "First response byte", maybe_timestamp(resp, "timestamp_start") - ] + ) ) parts.append( - [ + ( "Response complete", maybe_timestamp(resp, "timestamp_end") - ] + ) ) if parts: @@ -181,6 +181,6 @@ def flowdetails(state, flow: http.HTTPFlow): parts = sorted(parts, key=lambda p: p[1]) text.append(urwid.Text([("head", "Timing:")])) - text.extend(common.format_keyvals(parts, key="key", val="text", indent=4)) + text.extend(common.format_keyvals(parts, indent=4)) return searchable.Searchable(text) diff --git a/mitmproxy/tools/console/flowview.py b/mitmproxy/tools/console/flowview.py index 05d2573f8..8d572f7b1 100644 --- a/mitmproxy/tools/console/flowview.py +++ b/mitmproxy/tools/console/flowview.py @@ -154,8 +154,7 @@ class FlowDetails(tabs.Tabs): if conn: txt = common.format_keyvals( [(h + ":", v) for (h, v) in conn.headers.items(multi=True)], - key = "header", - val = "text" + key_format="header" ) viewmode = self.master.commands.call("console.flowview.mode") msg, body = self.content_view(viewmode, conn) diff --git a/mitmproxy/tools/console/help.py b/mitmproxy/tools/console/help.py index 439289f63..1b4b9ac61 100644 --- a/mitmproxy/tools/console/help.py +++ b/mitmproxy/tools/console/help.py @@ -76,7 +76,7 @@ class HelpView(tabs.Tabs, layoutwidget.LayoutWidget): def filtexp(self): text = [] - text.extend(common.format_keyvals(flowfilter.help, key="key", val="text", indent=4)) + text.extend(common.format_keyvals(flowfilter.help, indent=4)) text.append( urwid.Text( [ @@ -96,7 +96,7 @@ class HelpView(tabs.Tabs, layoutwidget.LayoutWidget): ("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."), ] text.extend( - common.format_keyvals(examples, key="key", val="text", indent=4) + common.format_keyvals(examples, indent=4) ) return CListBox(text) diff --git a/test/mitmproxy/tools/console/test_common.py b/test/mitmproxy/tools/console/test_common.py index a996c0104..72438c496 100644 --- a/test/mitmproxy/tools/console/test_common.py +++ b/test/mitmproxy/tools/console/test_common.py @@ -1,3 +1,5 @@ +import urwid + from mitmproxy.test import tflow from mitmproxy.tools.console import common @@ -7,3 +9,26 @@ def test_format_flow(): assert common.format_flow(f, True) assert common.format_flow(f, True, hostheader=True) assert common.format_flow(f, True, extended=True) + + +def test_format_keyvals(): + assert common.format_keyvals( + [ + ("aa", "bb"), + ("cc", "dd"), + ("ee", None), + ] + ) + wrapped = urwid.BoxAdapter( + urwid.ListBox( + urwid.SimpleFocusListWalker( + common.format_keyvals([("foo", "bar")]) + ) + ), 1 + ) + assert wrapped.render((30, )) + assert common.format_keyvals( + [ + ("aa", wrapped) + ] + ) diff --git a/test/mitmproxy/tools/console/test_master.py b/test/mitmproxy/tools/console/test_master.py index 3aa0dc546..9779a4824 100644 --- a/test/mitmproxy/tools/console/test_master.py +++ b/test/mitmproxy/tools/console/test_master.py @@ -4,22 +4,9 @@ from mitmproxy import options from mitmproxy.test import tflow from mitmproxy.test import tutils from mitmproxy.tools import console -from mitmproxy.tools.console import common from ... import tservers -def test_format_keyvals(): - assert common.format_keyvals( - [ - ("aa", "bb"), - None, - ("cc", "dd"), - (None, "dd"), - (None, "dd"), - ] - ) - - def test_options(): assert options.Options(replay_kill_extra=True)