mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-23 08:11:00 +00:00
Merge pull request #2759 from mhils/issue-1833
Fix #1833, clean up common.format_keyvals
This commit is contained in:
commit
1c769b0991
@ -43,9 +43,11 @@ def format_dict(
|
|||||||
) -> typing.Iterator[TViewLine]:
|
) -> typing.Iterator[TViewLine]:
|
||||||
"""
|
"""
|
||||||
Helper function that transforms the given dictionary into a list of
|
Helper function that transforms the given dictionary into a list of
|
||||||
|
[
|
||||||
("key", key )
|
("key", key )
|
||||||
("value", value)
|
("value", value)
|
||||||
tuples, where key is padded to a uniform width.
|
]
|
||||||
|
entries, where key is padded to a uniform width.
|
||||||
"""
|
"""
|
||||||
max_key_len = max(len(k) for k in d.keys())
|
max_key_len = max(len(k) for k in d.keys())
|
||||||
max_key_len = min(max_key_len, KEY_MAX)
|
max_key_len = min(max_key_len, KEY_MAX)
|
||||||
|
@ -53,8 +53,8 @@ def twebsocketflow(client_conn=True, server_conn=True, messages=True, err=None,
|
|||||||
sec_websocket_version="13",
|
sec_websocket_version="13",
|
||||||
sec_websocket_key="1234",
|
sec_websocket_key="1234",
|
||||||
),
|
),
|
||||||
timestamp_start=1,
|
timestamp_start=946681200,
|
||||||
timestamp_end=2,
|
timestamp_end=946681201,
|
||||||
content=b''
|
content=b''
|
||||||
)
|
)
|
||||||
resp = http.HTTPResponse(
|
resp = http.HTTPResponse(
|
||||||
@ -66,8 +66,8 @@ def twebsocketflow(client_conn=True, server_conn=True, messages=True, err=None,
|
|||||||
upgrade='websocket',
|
upgrade='websocket',
|
||||||
sec_websocket_accept=b'',
|
sec_websocket_accept=b'',
|
||||||
),
|
),
|
||||||
timestamp_start=1,
|
timestamp_start=946681202,
|
||||||
timestamp_end=2,
|
timestamp_end=946681203,
|
||||||
content=b'',
|
content=b'',
|
||||||
)
|
)
|
||||||
handshake_flow = http.HTTPFlow(client_conn, server_conn)
|
handshake_flow = http.HTTPFlow(client_conn, server_conn)
|
||||||
@ -158,9 +158,9 @@ def tclient_conn():
|
|||||||
clientcert=None,
|
clientcert=None,
|
||||||
mitmcert=None,
|
mitmcert=None,
|
||||||
ssl_established=False,
|
ssl_established=False,
|
||||||
timestamp_start=1,
|
timestamp_start=946681200,
|
||||||
timestamp_ssl_setup=2,
|
timestamp_ssl_setup=946681201,
|
||||||
timestamp_end=3,
|
timestamp_end=946681206,
|
||||||
sni="address",
|
sni="address",
|
||||||
cipher_name="cipher",
|
cipher_name="cipher",
|
||||||
alpn_proto_negotiated=b"http/1.1",
|
alpn_proto_negotiated=b"http/1.1",
|
||||||
@ -182,10 +182,10 @@ def tserver_conn():
|
|||||||
source_address=("address", 22),
|
source_address=("address", 22),
|
||||||
ip_address=("192.168.0.1", 22),
|
ip_address=("192.168.0.1", 22),
|
||||||
cert=None,
|
cert=None,
|
||||||
timestamp_start=1,
|
timestamp_start=946681202,
|
||||||
timestamp_tcp_setup=2,
|
timestamp_tcp_setup=946681203,
|
||||||
timestamp_ssl_setup=3,
|
timestamp_ssl_setup=946681204,
|
||||||
timestamp_end=4,
|
timestamp_end=946681205,
|
||||||
ssl_established=False,
|
ssl_established=False,
|
||||||
sni="address",
|
sni="address",
|
||||||
alpn_proto_negotiated=None,
|
alpn_proto_negotiated=None,
|
||||||
|
@ -31,8 +31,8 @@ def treq(**kwargs):
|
|||||||
http_version=b"HTTP/1.1",
|
http_version=b"HTTP/1.1",
|
||||||
headers=http.Headers(((b"header", b"qvalue"), (b"content-length", b"7"))),
|
headers=http.Headers(((b"header", b"qvalue"), (b"content-length", b"7"))),
|
||||||
content=b"content",
|
content=b"content",
|
||||||
timestamp_start=1,
|
timestamp_start=946681200,
|
||||||
timestamp_end=2,
|
timestamp_end=946681201,
|
||||||
)
|
)
|
||||||
default.update(kwargs)
|
default.update(kwargs)
|
||||||
return http.Request(**default)
|
return http.Request(**default)
|
||||||
@ -49,8 +49,8 @@ def tresp(**kwargs):
|
|||||||
reason=b"OK",
|
reason=b"OK",
|
||||||
headers=http.Headers(((b"header-response", b"svalue"), (b"content-length", b"7"))),
|
headers=http.Headers(((b"header-response", b"svalue"), (b"content-length", b"7"))),
|
||||||
content=b"message",
|
content=b"message",
|
||||||
timestamp_start=1,
|
timestamp_start=946681202,
|
||||||
timestamp_end=2,
|
timestamp_end=946681203,
|
||||||
)
|
)
|
||||||
default.update(kwargs)
|
default.update(kwargs)
|
||||||
return http.Response(**default)
|
return http.Response(**default)
|
||||||
|
@ -1,9 +1,10 @@
|
|||||||
import platform
|
import platform
|
||||||
|
import typing
|
||||||
|
from functools import lru_cache
|
||||||
|
|
||||||
import urwid
|
import urwid
|
||||||
import urwid.util
|
import urwid.util
|
||||||
|
|
||||||
from functools import lru_cache
|
|
||||||
from mitmproxy.utils import human
|
from mitmproxy.utils import human
|
||||||
|
|
||||||
# Detect Windows Subsystem for Linux
|
# Detect Windows Subsystem for Linux
|
||||||
@ -43,41 +44,48 @@ def highlight_key(str, key, textattr="text", keyattr="key"):
|
|||||||
KEY_MAX = 30
|
KEY_MAX = 30
|
||||||
|
|
||||||
|
|
||||||
def format_keyvals(lst, key="key", val="text", indent=0):
|
def format_keyvals(
|
||||||
|
entries: typing.List[typing.Tuple[str, typing.Union[None, str, urwid.Widget]]],
|
||||||
|
key_format: str = "key",
|
||||||
|
value_format: str = "text",
|
||||||
|
indent: int = 0
|
||||||
|
) -> typing.List[urwid.Columns]:
|
||||||
"""
|
"""
|
||||||
Format a list of (key, value) tuples.
|
Format a list of (key, value) tuples.
|
||||||
|
|
||||||
If key is None, it's treated specially:
|
Args:
|
||||||
- We assume a sub-value, and add an extra indent.
|
entries: The list to format. keys must be strings, values can also be None or urwid widgets.
|
||||||
- The value is treated as a pre-formatted list of directives.
|
The latter makes it possible to use the result of format_keyvals() as a value.
|
||||||
|
key_format: The display attribute for the key.
|
||||||
|
value_format: The display attribute for the value.
|
||||||
|
indent: Additional indent to apply.
|
||||||
"""
|
"""
|
||||||
|
max_key_len = max((len(k) for k, v in entries if k is not None), default=0)
|
||||||
|
max_key_len = min(max_key_len, KEY_MAX)
|
||||||
|
|
||||||
|
if indent > 2:
|
||||||
|
indent -= 2 # We use dividechars=2 below, which already adds two empty spaces
|
||||||
|
|
||||||
ret = []
|
ret = []
|
||||||
if lst:
|
for k, v in entries:
|
||||||
maxk = min(max(len(i[0]) for i in lst if i and i[0]), KEY_MAX)
|
if v is None:
|
||||||
for i, kv in enumerate(lst):
|
v = urwid.Text("")
|
||||||
if kv is None:
|
elif not isinstance(v, urwid.Widget):
|
||||||
ret.append(urwid.Text(""))
|
v = urwid.Text([(value_format, v)])
|
||||||
else:
|
ret.append(
|
||||||
if isinstance(kv[1], urwid.Widget):
|
urwid.Columns(
|
||||||
v = kv[1]
|
[
|
||||||
elif kv[1] is None:
|
("fixed", indent, urwid.Text("")),
|
||||||
v = urwid.Text("")
|
(
|
||||||
else:
|
"fixed",
|
||||||
v = urwid.Text([(val, kv[1])])
|
max_key_len,
|
||||||
ret.append(
|
urwid.Text([(key_format, k)])
|
||||||
urwid.Columns(
|
),
|
||||||
[
|
v
|
||||||
("fixed", indent, urwid.Text("")),
|
],
|
||||||
(
|
dividechars=2
|
||||||
"fixed",
|
)
|
||||||
maxk,
|
)
|
||||||
urwid.Text([(key, kv[0] or "")])
|
|
||||||
),
|
|
||||||
v
|
|
||||||
],
|
|
||||||
dividechars = 2
|
|
||||||
)
|
|
||||||
)
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
|
||||||
@ -205,19 +213,15 @@ def format_flow(f, focus, extended=False, hostheader=False, max_url_len=False):
|
|||||||
focus=focus,
|
focus=focus,
|
||||||
extended=extended,
|
extended=extended,
|
||||||
max_url_len=max_url_len,
|
max_url_len=max_url_len,
|
||||||
|
intercepted=f.intercepted,
|
||||||
intercepted = f.intercepted,
|
acked=acked,
|
||||||
acked = acked,
|
req_timestamp=f.request.timestamp_start,
|
||||||
|
req_is_replay=f.request.is_replay,
|
||||||
req_timestamp = f.request.timestamp_start,
|
req_method=f.request.method,
|
||||||
req_is_replay = f.request.is_replay,
|
req_url=f.request.pretty_url if hostheader else f.request.url,
|
||||||
req_method = f.request.method,
|
req_http_version=f.request.http_version,
|
||||||
req_url = f.request.pretty_url if hostheader else f.request.url,
|
err_msg=f.error.msg if f.error else None,
|
||||||
req_http_version = f.request.http_version,
|
marked=f.marked,
|
||||||
|
|
||||||
err_msg = f.error.msg if f.error else None,
|
|
||||||
|
|
||||||
marked = f.marked,
|
|
||||||
)
|
)
|
||||||
if f.response:
|
if f.response:
|
||||||
if f.response.raw_content:
|
if f.response.raw_content:
|
||||||
@ -232,11 +236,11 @@ def format_flow(f, focus, extended=False, hostheader=False, max_url_len=False):
|
|||||||
roundtrip = human.pretty_duration(duration)
|
roundtrip = human.pretty_duration(duration)
|
||||||
|
|
||||||
d.update(dict(
|
d.update(dict(
|
||||||
resp_code = f.response.status_code,
|
resp_code=f.response.status_code,
|
||||||
resp_reason = f.response.reason,
|
resp_reason=f.response.reason,
|
||||||
resp_is_replay = f.response.is_replay,
|
resp_is_replay=f.response.is_replay,
|
||||||
resp_clen = contentdesc,
|
resp_clen=contentdesc,
|
||||||
roundtrip = roundtrip,
|
roundtrip=roundtrip,
|
||||||
))
|
))
|
||||||
|
|
||||||
t = f.response.headers.get("content-type")
|
t = f.response.headers.get("content-type")
|
||||||
|
@ -59,7 +59,7 @@ def map(km):
|
|||||||
km.add("M", "view.marked.toggle", ["flowlist"], "Toggle viewing marked flows")
|
km.add("M", "view.marked.toggle", ["flowlist"], "Toggle viewing marked flows")
|
||||||
km.add(
|
km.add(
|
||||||
"n",
|
"n",
|
||||||
"console.command view.create get https://google.com",
|
"console.command view.create get https://example.com/",
|
||||||
["flowlist"],
|
["flowlist"],
|
||||||
"Create a new flow"
|
"Create a new flow"
|
||||||
)
|
)
|
||||||
|
@ -23,157 +23,157 @@ def flowdetails(state, flow: http.HTTPFlow):
|
|||||||
metadata = flow.metadata
|
metadata = flow.metadata
|
||||||
|
|
||||||
if metadata is not None and len(metadata) > 0:
|
if metadata is not None and len(metadata) > 0:
|
||||||
parts = [[str(k), repr(v)] for k, v in metadata.items()]
|
parts = [(str(k), repr(v)) for k, v in metadata.items()]
|
||||||
text.append(urwid.Text([("head", "Metadata:")]))
|
text.append(urwid.Text([("head", "Metadata:")]))
|
||||||
text.extend(common.format_keyvals(parts, key="key", val="text", indent=4))
|
text.extend(common.format_keyvals(parts, indent=4))
|
||||||
|
|
||||||
if sc is not None and sc.ip_address:
|
if sc is not None and sc.ip_address:
|
||||||
text.append(urwid.Text([("head", "Server Connection:")]))
|
text.append(urwid.Text([("head", "Server Connection:")]))
|
||||||
parts = [
|
parts = [
|
||||||
["Address", human.format_address(sc.address)],
|
("Address", human.format_address(sc.address)),
|
||||||
]
|
]
|
||||||
if sc.ip_address:
|
if sc.ip_address:
|
||||||
parts.append(["Resolved Address", human.format_address(sc.ip_address)])
|
parts.append(("Resolved Address", human.format_address(sc.ip_address)))
|
||||||
if resp:
|
if resp:
|
||||||
parts.append(["HTTP Version", resp.http_version])
|
parts.append(("HTTP Version", resp.http_version))
|
||||||
if sc.alpn_proto_negotiated:
|
if sc.alpn_proto_negotiated:
|
||||||
parts.append(["ALPN", sc.alpn_proto_negotiated])
|
parts.append(("ALPN", sc.alpn_proto_negotiated))
|
||||||
|
|
||||||
text.extend(
|
text.extend(
|
||||||
common.format_keyvals(parts, key="key", val="text", indent=4)
|
common.format_keyvals(parts, indent=4)
|
||||||
)
|
)
|
||||||
|
|
||||||
c = sc.cert
|
c = sc.cert
|
||||||
if c:
|
if c:
|
||||||
text.append(urwid.Text([("head", "Server Certificate:")]))
|
text.append(urwid.Text([("head", "Server Certificate:")]))
|
||||||
parts = [
|
parts = [
|
||||||
["Type", "%s, %s bits" % c.keyinfo],
|
("Type", "%s, %s bits" % c.keyinfo),
|
||||||
["SHA1 digest", c.digest("sha1")],
|
("SHA1 digest", c.digest("sha1")),
|
||||||
["Valid to", str(c.notafter)],
|
("Valid to", str(c.notafter)),
|
||||||
["Valid from", str(c.notbefore)],
|
("Valid from", str(c.notbefore)),
|
||||||
["Serial", str(c.serial)],
|
("Serial", str(c.serial)),
|
||||||
[
|
(
|
||||||
"Subject",
|
"Subject",
|
||||||
urwid.BoxAdapter(
|
urwid.BoxAdapter(
|
||||||
urwid.ListBox(
|
urwid.ListBox(
|
||||||
common.format_keyvals(
|
common.format_keyvals(
|
||||||
c.subject,
|
c.subject,
|
||||||
key="highlight",
|
key_format="highlight"
|
||||||
val="text"
|
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
len(c.subject)
|
len(c.subject)
|
||||||
)
|
)
|
||||||
],
|
),
|
||||||
[
|
(
|
||||||
"Issuer",
|
"Issuer",
|
||||||
urwid.BoxAdapter(
|
urwid.BoxAdapter(
|
||||||
urwid.ListBox(
|
urwid.ListBox(
|
||||||
common.format_keyvals(
|
common.format_keyvals(
|
||||||
c.issuer, key="highlight", val="text"
|
c.issuer,
|
||||||
|
key_format="highlight"
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
len(c.issuer)
|
len(c.issuer)
|
||||||
)
|
)
|
||||||
]
|
)
|
||||||
]
|
]
|
||||||
|
|
||||||
if c.altnames:
|
if c.altnames:
|
||||||
parts.append(
|
parts.append(
|
||||||
[
|
(
|
||||||
"Alt names",
|
"Alt names",
|
||||||
", ".join(strutils.bytes_to_escaped_str(x) for x in c.altnames)
|
", ".join(strutils.bytes_to_escaped_str(x) for x in c.altnames)
|
||||||
]
|
)
|
||||||
)
|
)
|
||||||
text.extend(
|
text.extend(
|
||||||
common.format_keyvals(parts, key="key", val="text", indent=4)
|
common.format_keyvals(parts, indent=4)
|
||||||
)
|
)
|
||||||
|
|
||||||
if cc is not None:
|
if cc is not None:
|
||||||
text.append(urwid.Text([("head", "Client Connection:")]))
|
text.append(urwid.Text([("head", "Client Connection:")]))
|
||||||
|
|
||||||
parts = [
|
parts = [
|
||||||
["Address", "{}:{}".format(cc.address[0], cc.address[1])],
|
("Address", "{}:{}".format(cc.address[0], cc.address[1])),
|
||||||
]
|
]
|
||||||
if req:
|
if req:
|
||||||
parts.append(["HTTP Version", req.http_version])
|
parts.append(("HTTP Version", req.http_version))
|
||||||
if cc.tls_version:
|
if cc.tls_version:
|
||||||
parts.append(["TLS Version", cc.tls_version])
|
parts.append(("TLS Version", cc.tls_version))
|
||||||
if cc.sni:
|
if cc.sni:
|
||||||
parts.append(["Server Name Indication", cc.sni])
|
parts.append(("Server Name Indication", cc.sni))
|
||||||
if cc.cipher_name:
|
if cc.cipher_name:
|
||||||
parts.append(["Cipher Name", cc.cipher_name])
|
parts.append(("Cipher Name", cc.cipher_name))
|
||||||
if cc.alpn_proto_negotiated:
|
if cc.alpn_proto_negotiated:
|
||||||
parts.append(["ALPN", cc.alpn_proto_negotiated])
|
parts.append(("ALPN", cc.alpn_proto_negotiated))
|
||||||
|
|
||||||
text.extend(
|
text.extend(
|
||||||
common.format_keyvals(parts, key="key", val="text", indent=4)
|
common.format_keyvals(parts, indent=4)
|
||||||
)
|
)
|
||||||
|
|
||||||
parts = []
|
parts = []
|
||||||
|
|
||||||
if cc is not None and cc.timestamp_start:
|
if cc is not None and cc.timestamp_start:
|
||||||
parts.append(
|
parts.append(
|
||||||
[
|
(
|
||||||
"Client conn. established",
|
"Client conn. established",
|
||||||
maybe_timestamp(cc, "timestamp_start")
|
maybe_timestamp(cc, "timestamp_start")
|
||||||
]
|
)
|
||||||
)
|
)
|
||||||
if cc.ssl_established:
|
if cc.ssl_established:
|
||||||
parts.append(
|
parts.append(
|
||||||
[
|
(
|
||||||
"Client conn. TLS handshake",
|
"Client conn. TLS handshake",
|
||||||
maybe_timestamp(cc, "timestamp_ssl_setup")
|
maybe_timestamp(cc, "timestamp_ssl_setup")
|
||||||
]
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if sc is not None and sc.timestamp_start:
|
if sc is not None and sc.timestamp_start:
|
||||||
parts.append(
|
parts.append(
|
||||||
[
|
(
|
||||||
"Server conn. initiated",
|
"Server conn. initiated",
|
||||||
maybe_timestamp(sc, "timestamp_start")
|
maybe_timestamp(sc, "timestamp_start")
|
||||||
]
|
)
|
||||||
)
|
)
|
||||||
parts.append(
|
parts.append(
|
||||||
[
|
(
|
||||||
"Server conn. TCP handshake",
|
"Server conn. TCP handshake",
|
||||||
maybe_timestamp(sc, "timestamp_tcp_setup")
|
maybe_timestamp(sc, "timestamp_tcp_setup")
|
||||||
]
|
)
|
||||||
)
|
)
|
||||||
if sc.ssl_established:
|
if sc.ssl_established:
|
||||||
parts.append(
|
parts.append(
|
||||||
[
|
(
|
||||||
"Server conn. TLS handshake",
|
"Server conn. TLS handshake",
|
||||||
maybe_timestamp(sc, "timestamp_ssl_setup")
|
maybe_timestamp(sc, "timestamp_ssl_setup")
|
||||||
]
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if req is not None and req.timestamp_start:
|
if req is not None and req.timestamp_start:
|
||||||
parts.append(
|
parts.append(
|
||||||
[
|
(
|
||||||
"First request byte",
|
"First request byte",
|
||||||
maybe_timestamp(req, "timestamp_start")
|
maybe_timestamp(req, "timestamp_start")
|
||||||
]
|
)
|
||||||
)
|
)
|
||||||
parts.append(
|
parts.append(
|
||||||
[
|
(
|
||||||
"Request complete",
|
"Request complete",
|
||||||
maybe_timestamp(req, "timestamp_end")
|
maybe_timestamp(req, "timestamp_end")
|
||||||
]
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if resp is not None and resp.timestamp_start:
|
if resp is not None and resp.timestamp_start:
|
||||||
parts.append(
|
parts.append(
|
||||||
[
|
(
|
||||||
"First response byte",
|
"First response byte",
|
||||||
maybe_timestamp(resp, "timestamp_start")
|
maybe_timestamp(resp, "timestamp_start")
|
||||||
]
|
)
|
||||||
)
|
)
|
||||||
parts.append(
|
parts.append(
|
||||||
[
|
(
|
||||||
"Response complete",
|
"Response complete",
|
||||||
maybe_timestamp(resp, "timestamp_end")
|
maybe_timestamp(resp, "timestamp_end")
|
||||||
]
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if parts:
|
if parts:
|
||||||
@ -181,6 +181,6 @@ def flowdetails(state, flow: http.HTTPFlow):
|
|||||||
parts = sorted(parts, key=lambda p: p[1])
|
parts = sorted(parts, key=lambda p: p[1])
|
||||||
|
|
||||||
text.append(urwid.Text([("head", "Timing:")]))
|
text.append(urwid.Text([("head", "Timing:")]))
|
||||||
text.extend(common.format_keyvals(parts, key="key", val="text", indent=4))
|
text.extend(common.format_keyvals(parts, indent=4))
|
||||||
|
|
||||||
return searchable.Searchable(text)
|
return searchable.Searchable(text)
|
||||||
|
@ -13,6 +13,7 @@ from mitmproxy.tools.console import flowdetailview
|
|||||||
from mitmproxy.tools.console import searchable
|
from mitmproxy.tools.console import searchable
|
||||||
from mitmproxy.tools.console import tabs
|
from mitmproxy.tools.console import tabs
|
||||||
import mitmproxy.tools.console.master # noqa
|
import mitmproxy.tools.console.master # noqa
|
||||||
|
from mitmproxy.utils import strutils
|
||||||
|
|
||||||
|
|
||||||
class SearchError(Exception):
|
class SearchError(Exception):
|
||||||
@ -152,10 +153,31 @@ class FlowDetails(tabs.Tabs):
|
|||||||
|
|
||||||
def conn_text(self, conn):
|
def conn_text(self, conn):
|
||||||
if conn:
|
if conn:
|
||||||
|
hdrs = []
|
||||||
|
for k, v in conn.headers.fields:
|
||||||
|
# This will always force an ascii representation of headers. For example, if the server sends a
|
||||||
|
#
|
||||||
|
# X-Authors: Made with ❤ in Hamburg
|
||||||
|
#
|
||||||
|
# header, mitmproxy will display the following:
|
||||||
|
#
|
||||||
|
# X-Authors: Made with \xe2\x9d\xa4 in Hamburg.
|
||||||
|
#
|
||||||
|
# The alternative would be to just use the header's UTF-8 representation and maybe
|
||||||
|
# do `str.replace("\t", "\\t")` to exempt tabs from urwid's special characters escaping [1].
|
||||||
|
# That would in some terminals allow rendering UTF-8 characters, but the mapping
|
||||||
|
# wouldn't be bijective, i.e. a user couldn't distinguish "\\t" and "\t".
|
||||||
|
# Also, from a security perspective, a mitmproxy user couldn't be fooled by homoglyphs.
|
||||||
|
#
|
||||||
|
# 1) https://github.com/mitmproxy/mitmproxy/issues/1833
|
||||||
|
# https://github.com/urwid/urwid/blob/6608ee2c9932d264abd1171468d833b7a4082e13/urwid/display_common.py#L35-L36,
|
||||||
|
|
||||||
|
k = strutils.bytes_to_escaped_str(k) + ":"
|
||||||
|
v = strutils.bytes_to_escaped_str(v)
|
||||||
|
hdrs.append((k, v))
|
||||||
txt = common.format_keyvals(
|
txt = common.format_keyvals(
|
||||||
[(h + ":", v) for (h, v) in conn.headers.items(multi=True)],
|
hdrs,
|
||||||
key = "header",
|
key_format="header"
|
||||||
val = "text"
|
|
||||||
)
|
)
|
||||||
viewmode = self.master.commands.call("console.flowview.mode")
|
viewmode = self.master.commands.call("console.flowview.mode")
|
||||||
msg, body = self.content_view(viewmode, conn)
|
msg, body = self.content_view(viewmode, conn)
|
||||||
|
@ -76,7 +76,7 @@ class HelpView(tabs.Tabs, layoutwidget.LayoutWidget):
|
|||||||
|
|
||||||
def filtexp(self):
|
def filtexp(self):
|
||||||
text = []
|
text = []
|
||||||
text.extend(common.format_keyvals(flowfilter.help, key="key", val="text", indent=4))
|
text.extend(common.format_keyvals(flowfilter.help, indent=4))
|
||||||
text.append(
|
text.append(
|
||||||
urwid.Text(
|
urwid.Text(
|
||||||
[
|
[
|
||||||
@ -96,7 +96,7 @@ class HelpView(tabs.Tabs, layoutwidget.LayoutWidget):
|
|||||||
("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."),
|
("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."),
|
||||||
]
|
]
|
||||||
text.extend(
|
text.extend(
|
||||||
common.format_keyvals(examples, key="key", val="text", indent=4)
|
common.format_keyvals(examples, indent=4)
|
||||||
)
|
)
|
||||||
return CListBox(text)
|
return CListBox(text)
|
||||||
|
|
||||||
|
@ -23,8 +23,8 @@ def test_extract():
|
|||||||
["request.text", "content"],
|
["request.text", "content"],
|
||||||
["request.content", b"content"],
|
["request.content", b"content"],
|
||||||
["request.raw_content", b"content"],
|
["request.raw_content", b"content"],
|
||||||
["request.timestamp_start", "1"],
|
["request.timestamp_start", "946681200"],
|
||||||
["request.timestamp_end", "2"],
|
["request.timestamp_end", "946681201"],
|
||||||
["request.header[header]", "qvalue"],
|
["request.header[header]", "qvalue"],
|
||||||
|
|
||||||
["response.status_code", "200"],
|
["response.status_code", "200"],
|
||||||
@ -33,8 +33,8 @@ def test_extract():
|
|||||||
["response.content", b"message"],
|
["response.content", b"message"],
|
||||||
["response.raw_content", b"message"],
|
["response.raw_content", b"message"],
|
||||||
["response.header[header-response]", "svalue"],
|
["response.header[header-response]", "svalue"],
|
||||||
["response.timestamp_start", "1"],
|
["response.timestamp_start", "946681202"],
|
||||||
["response.timestamp_end", "2"],
|
["response.timestamp_end", "946681203"],
|
||||||
|
|
||||||
["client_conn.address.port", "22"],
|
["client_conn.address.port", "22"],
|
||||||
["client_conn.address.host", "127.0.0.1"],
|
["client_conn.address.host", "127.0.0.1"],
|
||||||
@ -49,10 +49,9 @@ def test_extract():
|
|||||||
["server_conn.sni", "address"],
|
["server_conn.sni", "address"],
|
||||||
["server_conn.ssl_established", "false"],
|
["server_conn.ssl_established", "false"],
|
||||||
]
|
]
|
||||||
for t in tests:
|
for spec, expected in tests:
|
||||||
ret = cut.extract(t[0], tf)
|
ret = cut.extract(spec, tf)
|
||||||
if ret != t[1]:
|
assert spec and ret == expected
|
||||||
raise AssertionError("%s: Expected %s, got %s" % (t[0], t[1], ret))
|
|
||||||
|
|
||||||
with open(tutils.test_data.path("mitmproxy/net/data/text_cert"), "rb") as f:
|
with open(tutils.test_data.path("mitmproxy/net/data/text_cert"), "rb") as f:
|
||||||
d = f.read()
|
d = f.read()
|
||||||
|
@ -41,7 +41,7 @@ def test_order_generators():
|
|||||||
tf = tflow.tflow(resp=True)
|
tf = tflow.tflow(resp=True)
|
||||||
|
|
||||||
rs = view.OrderRequestStart(v)
|
rs = view.OrderRequestStart(v)
|
||||||
assert rs.generate(tf) == 1
|
assert rs.generate(tf) == 946681200
|
||||||
|
|
||||||
rm = view.OrderRequestMethod(v)
|
rm = view.OrderRequestMethod(v)
|
||||||
assert rm.generate(tf) == tf.request.method
|
assert rm.generate(tf) == tf.request.method
|
||||||
|
@ -150,10 +150,10 @@ class TestResponseUtils:
|
|||||||
n = time.time()
|
n = time.time()
|
||||||
r.headers["date"] = email.utils.formatdate(n)
|
r.headers["date"] = email.utils.formatdate(n)
|
||||||
pre = r.headers["date"]
|
pre = r.headers["date"]
|
||||||
r.refresh(1)
|
r.refresh(946681202)
|
||||||
assert pre == r.headers["date"]
|
assert pre == r.headers["date"]
|
||||||
|
|
||||||
r.refresh(61)
|
r.refresh(946681262)
|
||||||
d = email.utils.parsedate_tz(r.headers["date"])
|
d = email.utils.parsedate_tz(r.headers["date"])
|
||||||
d = email.utils.mktime_tz(d)
|
d = email.utils.mktime_tz(d)
|
||||||
# Weird that this is not exact...
|
# Weird that this is not exact...
|
||||||
|
@ -1,12 +1,34 @@
|
|||||||
|
import urwid
|
||||||
|
|
||||||
from mitmproxy.test import tflow
|
from mitmproxy.test import tflow
|
||||||
from mitmproxy.tools.console import common
|
from mitmproxy.tools.console import common
|
||||||
|
|
||||||
from ....conftest import skip_appveyor
|
|
||||||
|
|
||||||
|
|
||||||
@skip_appveyor
|
|
||||||
def test_format_flow():
|
def test_format_flow():
|
||||||
f = tflow.tflow(resp=True)
|
f = tflow.tflow(resp=True)
|
||||||
assert common.format_flow(f, True)
|
assert common.format_flow(f, True)
|
||||||
assert common.format_flow(f, True, hostheader=True)
|
assert common.format_flow(f, True, hostheader=True)
|
||||||
assert common.format_flow(f, True, extended=True)
|
assert common.format_flow(f, True, extended=True)
|
||||||
|
|
||||||
|
|
||||||
|
def test_format_keyvals():
|
||||||
|
assert common.format_keyvals(
|
||||||
|
[
|
||||||
|
("aa", "bb"),
|
||||||
|
("cc", "dd"),
|
||||||
|
("ee", None),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
wrapped = urwid.BoxAdapter(
|
||||||
|
urwid.ListBox(
|
||||||
|
urwid.SimpleFocusListWalker(
|
||||||
|
common.format_keyvals([("foo", "bar")])
|
||||||
|
)
|
||||||
|
), 1
|
||||||
|
)
|
||||||
|
assert wrapped.render((30, ))
|
||||||
|
assert common.format_keyvals(
|
||||||
|
[
|
||||||
|
("aa", wrapped)
|
||||||
|
]
|
||||||
|
)
|
||||||
|
@ -4,22 +4,9 @@ from mitmproxy import options
|
|||||||
from mitmproxy.test import tflow
|
from mitmproxy.test import tflow
|
||||||
from mitmproxy.test import tutils
|
from mitmproxy.test import tutils
|
||||||
from mitmproxy.tools import console
|
from mitmproxy.tools import console
|
||||||
from mitmproxy.tools.console import common
|
|
||||||
from ... import tservers
|
from ... import tservers
|
||||||
|
|
||||||
|
|
||||||
def test_format_keyvals():
|
|
||||||
assert common.format_keyvals(
|
|
||||||
[
|
|
||||||
("aa", "bb"),
|
|
||||||
None,
|
|
||||||
("cc", "dd"),
|
|
||||||
(None, "dd"),
|
|
||||||
(None, "dd"),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_options():
|
def test_options():
|
||||||
assert options.Options(replay_kill_extra=True)
|
assert options.Options(replay_kill_extra=True)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user