refactor common.format_keyvals

the semantics here were really quite unclear,
now it is hopefully a bit more obvious what's happening.
Once we are Python 3.6+ exclusively, we may consider changing
the signature to accept a (order-preserving) dict instead of a list.
This commit is contained in:
Maximilian Hils 2018-01-05 16:09:43 +01:00
parent 9b03ab59ef
commit 2e2daeed89
6 changed files with 116 additions and 97 deletions

View File

@ -1,9 +1,10 @@
import platform import platform
import typing
from functools import lru_cache
import urwid import urwid
import urwid.util import urwid.util
from functools import lru_cache
from mitmproxy.utils import human from mitmproxy.utils import human
# Detect Windows Subsystem for Linux # Detect Windows Subsystem for Linux
@ -43,39 +44,46 @@ def highlight_key(str, key, textattr="text", keyattr="key"):
KEY_MAX = 30 KEY_MAX = 30
def format_keyvals(lst, key="key", val="text", indent=0): def format_keyvals(
entries: typing.List[typing.Tuple[str, typing.Union[None, str, urwid.Widget]]],
key_format: str = "key",
value_format: str = "text",
indent: int = 0
) -> typing.List[urwid.Columns]:
""" """
Format a list of (key, value) tuples. Format a list of (key, value) tuples.
If key is None, it's treated specially: Args:
- We assume a sub-value, and add an extra indent. entries: The list to format. keys must be strings, values can also be None or urwid widgets.
- The value is treated as a pre-formatted list of directives. The latter makes it possible to use the result of format_keyvals() as a value.
key_format: The display attribute for the key.
value_format: The display attribute for the value.
indent: Additional indent to apply.
""" """
max_key_len = max((len(k) for k, v in entries if k is not None), default=0)
max_key_len = min(max_key_len, KEY_MAX)
if indent > 2:
indent -= 2 # We use dividechars=2 below, which already adds two empty spaces
ret = [] ret = []
if lst: for k, v in entries:
maxk = min(max(len(i[0]) for i in lst if i and i[0]), KEY_MAX) if v is None:
for i, kv in enumerate(lst):
if kv is None:
ret.append(urwid.Text(""))
else:
if isinstance(kv[1], urwid.Widget):
v = kv[1]
elif kv[1] is None:
v = urwid.Text("") v = urwid.Text("")
else: elif not isinstance(v, urwid.Widget):
v = urwid.Text([(val, kv[1])]) v = urwid.Text([(value_format, v)])
ret.append( ret.append(
urwid.Columns( urwid.Columns(
[ [
("fixed", indent, urwid.Text("")), ("fixed", indent, urwid.Text("")),
( (
"fixed", "fixed",
maxk, max_key_len,
urwid.Text([(key, kv[0] or "")]) urwid.Text([(key_format, k)])
), ),
v v
], ],
dividechars = 2 dividechars=2
) )
) )
return ret return ret

View File

@ -23,157 +23,157 @@ def flowdetails(state, flow: http.HTTPFlow):
metadata = flow.metadata metadata = flow.metadata
if metadata is not None and len(metadata) > 0: if metadata is not None and len(metadata) > 0:
parts = [[str(k), repr(v)] for k, v in metadata.items()] parts = [(str(k), repr(v)) for k, v in metadata.items()]
text.append(urwid.Text([("head", "Metadata:")])) text.append(urwid.Text([("head", "Metadata:")]))
text.extend(common.format_keyvals(parts, key="key", val="text", indent=4)) text.extend(common.format_keyvals(parts, indent=4))
if sc is not None and sc.ip_address: if sc is not None and sc.ip_address:
text.append(urwid.Text([("head", "Server Connection:")])) text.append(urwid.Text([("head", "Server Connection:")]))
parts = [ parts = [
["Address", human.format_address(sc.address)], ("Address", human.format_address(sc.address)),
] ]
if sc.ip_address: if sc.ip_address:
parts.append(["Resolved Address", human.format_address(sc.ip_address)]) parts.append(("Resolved Address", human.format_address(sc.ip_address)))
if resp: if resp:
parts.append(["HTTP Version", resp.http_version]) parts.append(("HTTP Version", resp.http_version))
if sc.alpn_proto_negotiated: if sc.alpn_proto_negotiated:
parts.append(["ALPN", sc.alpn_proto_negotiated]) parts.append(("ALPN", sc.alpn_proto_negotiated))
text.extend( text.extend(
common.format_keyvals(parts, key="key", val="text", indent=4) common.format_keyvals(parts, indent=4)
) )
c = sc.cert c = sc.cert
if c: if c:
text.append(urwid.Text([("head", "Server Certificate:")])) text.append(urwid.Text([("head", "Server Certificate:")]))
parts = [ parts = [
["Type", "%s, %s bits" % c.keyinfo], ("Type", "%s, %s bits" % c.keyinfo),
["SHA1 digest", c.digest("sha1")], ("SHA1 digest", c.digest("sha1")),
["Valid to", str(c.notafter)], ("Valid to", str(c.notafter)),
["Valid from", str(c.notbefore)], ("Valid from", str(c.notbefore)),
["Serial", str(c.serial)], ("Serial", str(c.serial)),
[ (
"Subject", "Subject",
urwid.BoxAdapter( urwid.BoxAdapter(
urwid.ListBox( urwid.ListBox(
common.format_keyvals( common.format_keyvals(
c.subject, c.subject,
key="highlight", key_format="highlight"
val="text"
) )
), ),
len(c.subject) len(c.subject)
) )
], ),
[ (
"Issuer", "Issuer",
urwid.BoxAdapter( urwid.BoxAdapter(
urwid.ListBox( urwid.ListBox(
common.format_keyvals( common.format_keyvals(
c.issuer, key="highlight", val="text" c.issuer,
key_format="highlight"
) )
), ),
len(c.issuer) len(c.issuer)
) )
] )
] ]
if c.altnames: if c.altnames:
parts.append( parts.append(
[ (
"Alt names", "Alt names",
", ".join(strutils.bytes_to_escaped_str(x) for x in c.altnames) ", ".join(strutils.bytes_to_escaped_str(x) for x in c.altnames)
] )
) )
text.extend( text.extend(
common.format_keyvals(parts, key="key", val="text", indent=4) common.format_keyvals(parts, indent=4)
) )
if cc is not None: if cc is not None:
text.append(urwid.Text([("head", "Client Connection:")])) text.append(urwid.Text([("head", "Client Connection:")]))
parts = [ parts = [
["Address", "{}:{}".format(cc.address[0], cc.address[1])], ("Address", "{}:{}".format(cc.address[0], cc.address[1])),
] ]
if req: if req:
parts.append(["HTTP Version", req.http_version]) parts.append(("HTTP Version", req.http_version))
if cc.tls_version: if cc.tls_version:
parts.append(["TLS Version", cc.tls_version]) parts.append(("TLS Version", cc.tls_version))
if cc.sni: if cc.sni:
parts.append(["Server Name Indication", cc.sni]) parts.append(("Server Name Indication", cc.sni))
if cc.cipher_name: if cc.cipher_name:
parts.append(["Cipher Name", cc.cipher_name]) parts.append(("Cipher Name", cc.cipher_name))
if cc.alpn_proto_negotiated: if cc.alpn_proto_negotiated:
parts.append(["ALPN", cc.alpn_proto_negotiated]) parts.append(("ALPN", cc.alpn_proto_negotiated))
text.extend( text.extend(
common.format_keyvals(parts, key="key", val="text", indent=4) common.format_keyvals(parts, indent=4)
) )
parts = [] parts = []
if cc is not None and cc.timestamp_start: if cc is not None and cc.timestamp_start:
parts.append( parts.append(
[ (
"Client conn. established", "Client conn. established",
maybe_timestamp(cc, "timestamp_start") maybe_timestamp(cc, "timestamp_start")
] )
) )
if cc.ssl_established: if cc.ssl_established:
parts.append( parts.append(
[ (
"Client conn. TLS handshake", "Client conn. TLS handshake",
maybe_timestamp(cc, "timestamp_ssl_setup") maybe_timestamp(cc, "timestamp_ssl_setup")
] )
) )
if sc is not None and sc.timestamp_start: if sc is not None and sc.timestamp_start:
parts.append( parts.append(
[ (
"Server conn. initiated", "Server conn. initiated",
maybe_timestamp(sc, "timestamp_start") maybe_timestamp(sc, "timestamp_start")
] )
) )
parts.append( parts.append(
[ (
"Server conn. TCP handshake", "Server conn. TCP handshake",
maybe_timestamp(sc, "timestamp_tcp_setup") maybe_timestamp(sc, "timestamp_tcp_setup")
] )
) )
if sc.ssl_established: if sc.ssl_established:
parts.append( parts.append(
[ (
"Server conn. TLS handshake", "Server conn. TLS handshake",
maybe_timestamp(sc, "timestamp_ssl_setup") maybe_timestamp(sc, "timestamp_ssl_setup")
] )
) )
if req is not None and req.timestamp_start: if req is not None and req.timestamp_start:
parts.append( parts.append(
[ (
"First request byte", "First request byte",
maybe_timestamp(req, "timestamp_start") maybe_timestamp(req, "timestamp_start")
] )
) )
parts.append( parts.append(
[ (
"Request complete", "Request complete",
maybe_timestamp(req, "timestamp_end") maybe_timestamp(req, "timestamp_end")
] )
) )
if resp is not None and resp.timestamp_start: if resp is not None and resp.timestamp_start:
parts.append( parts.append(
[ (
"First response byte", "First response byte",
maybe_timestamp(resp, "timestamp_start") maybe_timestamp(resp, "timestamp_start")
] )
) )
parts.append( parts.append(
[ (
"Response complete", "Response complete",
maybe_timestamp(resp, "timestamp_end") maybe_timestamp(resp, "timestamp_end")
] )
) )
if parts: if parts:
@ -181,6 +181,6 @@ def flowdetails(state, flow: http.HTTPFlow):
parts = sorted(parts, key=lambda p: p[1]) parts = sorted(parts, key=lambda p: p[1])
text.append(urwid.Text([("head", "Timing:")])) text.append(urwid.Text([("head", "Timing:")]))
text.extend(common.format_keyvals(parts, key="key", val="text", indent=4)) text.extend(common.format_keyvals(parts, indent=4))
return searchable.Searchable(text) return searchable.Searchable(text)

View File

@ -154,8 +154,7 @@ class FlowDetails(tabs.Tabs):
if conn: if conn:
txt = common.format_keyvals( txt = common.format_keyvals(
[(h + ":", v) for (h, v) in conn.headers.items(multi=True)], [(h + ":", v) for (h, v) in conn.headers.items(multi=True)],
key = "header", key_format="header"
val = "text"
) )
viewmode = self.master.commands.call("console.flowview.mode") viewmode = self.master.commands.call("console.flowview.mode")
msg, body = self.content_view(viewmode, conn) msg, body = self.content_view(viewmode, conn)

View File

@ -76,7 +76,7 @@ class HelpView(tabs.Tabs, layoutwidget.LayoutWidget):
def filtexp(self): def filtexp(self):
text = [] text = []
text.extend(common.format_keyvals(flowfilter.help, key="key", val="text", indent=4)) text.extend(common.format_keyvals(flowfilter.help, indent=4))
text.append( text.append(
urwid.Text( urwid.Text(
[ [
@ -96,7 +96,7 @@ class HelpView(tabs.Tabs, layoutwidget.LayoutWidget):
("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."), ("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."),
] ]
text.extend( text.extend(
common.format_keyvals(examples, key="key", val="text", indent=4) common.format_keyvals(examples, indent=4)
) )
return CListBox(text) return CListBox(text)

View File

@ -1,3 +1,5 @@
import urwid
from mitmproxy.test import tflow from mitmproxy.test import tflow
from mitmproxy.tools.console import common from mitmproxy.tools.console import common
@ -7,3 +9,26 @@ def test_format_flow():
assert common.format_flow(f, True) assert common.format_flow(f, True)
assert common.format_flow(f, True, hostheader=True) assert common.format_flow(f, True, hostheader=True)
assert common.format_flow(f, True, extended=True) assert common.format_flow(f, True, extended=True)
def test_format_keyvals():
assert common.format_keyvals(
[
("aa", "bb"),
("cc", "dd"),
("ee", None),
]
)
wrapped = urwid.BoxAdapter(
urwid.ListBox(
urwid.SimpleFocusListWalker(
common.format_keyvals([("foo", "bar")])
)
), 1
)
assert wrapped.render((30, ))
assert common.format_keyvals(
[
("aa", wrapped)
]
)

View File

@ -4,22 +4,9 @@ from mitmproxy import options
from mitmproxy.test import tflow from mitmproxy.test import tflow
from mitmproxy.test import tutils from mitmproxy.test import tutils
from mitmproxy.tools import console from mitmproxy.tools import console
from mitmproxy.tools.console import common
from ... import tservers from ... import tservers
def test_format_keyvals():
assert common.format_keyvals(
[
("aa", "bb"),
None,
("cc", "dd"),
(None, "dd"),
(None, "dd"),
]
)
def test_options(): def test_options():
assert options.Options(replay_kill_extra=True) assert options.Options(replay_kill_extra=True)