diff --git a/mitmproxy/tools/console/common.py b/mitmproxy/tools/console/common.py index 5d7ee09d5..49f5c247d 100644 --- a/mitmproxy/tools/console/common.py +++ b/mitmproxy/tools/console/common.py @@ -1,5 +1,8 @@ import platform import typing +import datetime +import time +import math from functools import lru_cache import urwid @@ -97,12 +100,171 @@ if urwid.util.detected_encoding and not IS_WSL: SYMBOL_MARK = u"\u25cf" SYMBOL_UP = u"\u21E7" SYMBOL_DOWN = u"\u21E9" + SYMBOL_ELLIPSIS = u"\u2026" else: SYMBOL_REPLAY = u"[r]" SYMBOL_RETURN = u"<-" SYMBOL_MARK = "[m]" SYMBOL_UP = "^" SYMBOL_DOWN = " " + SYMBOL_ELLIPSIS = "~" + + +def fixlen(s, maxlen): + if len(s) <= maxlen: + return s.ljust(maxlen) + else: + return s[0:maxlen - len(SYMBOL_ELLIPSIS)] + SYMBOL_ELLIPSIS + + +def fixlen_r(s, maxlen): + if len(s) <= maxlen: + return s.rjust(maxlen) + else: + return SYMBOL_ELLIPSIS + s[len(s) - maxlen + len(SYMBOL_ELLIPSIS):] + + +class TruncatedText(urwid.Widget): + def __init__(self, text, attr, align='left'): + self.text = text + self.attr = attr + self.align = align + super(TruncatedText, self).__init__() + + def pack(self, size, focus=False): + return (len(self.text), 1) + + def rows(self, size, focus=False): + return 1 + + def render(self, size, focus=False): + text = self.text + attr = self.attr + if self.align == 'right': + text = text[::-1] + attr = attr[::-1] + + text_len = len(text) # TODO: unicode? + if size is not None and len(size) > 0: + width = size[0] + else: + width = text_len + + if width >= text_len: + remaining = width - text_len + if remaining > 0: + c_text = text + ' ' * remaining + c_attr = attr + [('text', remaining)] + else: + c_text = text + c_attr = attr + else: + visible_len = width - len(SYMBOL_ELLIPSIS) + visible_text = text[0:visible_len] + c_text = visible_text + SYMBOL_ELLIPSIS + c_attr = (urwid.util.rle_subseg(attr, 0, len(visible_text.encode())) + + [('focus', len(SYMBOL_ELLIPSIS.encode()))]) + + if self.align == 'right': + c_text = c_text[::-1] + c_attr = c_attr[::-1] + + return urwid.TextCanvas([c_text.encode()], [c_attr], maxcol=width) + + +def truncated_plain(text, attr, align='left'): + return TruncatedText(text, [(attr, len(text.encode()))], align) + + +# Work around https://github.com/urwid/urwid/pull/330 +def rle_append_beginning_modify(rle, a_r): + """ + Append (a, r) (unpacked from *a_r*) to BEGINNING of rle. + Merge with first run when possible + + MODIFIES rle parameter contents. Returns None. + """ + a, r = a_r + if not rle: + rle[:] = [(a, r)] + else: + al, run = rle[0] + if a == al: + rle[0] = (a, run + r) + else: + rle[0:0] = [(a, r)] + + +def colorize_host(s): + if len(s) == 0 or s[0] == '[' or s.split('.')[-1].isdigit(): + main_part = -1 + else: + main_part = 1 # TODO: second-level domains (https://publicsuffix.org/list/) + part = 0 + attr = [] + for i in reversed(range(len(s))): + c = s[i] + if c == '.': + part += 1 + if c in ".:[]": + a = 'url_punctuation' + elif part == main_part: + a = 'url_domain' + else: + a = 'text' + rle_append_beginning_modify(attr, (a, len(c.encode()))) + return attr + + +def colorize_req(s): + path = s.split('?', 2)[0] + i_query = len(path) + i_last_slash = path.rfind('/') + i_ext = path[i_last_slash + 1:].rfind('.') + i_ext = i_last_slash + i_ext if i_ext >= 0 else len(s) + in_val = False + attr = [] + for i in range(len(s)): + c = s[i] + if ((i < i_query and c == '/') or + (i < i_query and i > i_last_slash and c == '.') or + (i == i_query)): + a = 'url_punctuation' + elif i > i_query: + if in_val: + if c == '&': + in_val = False + a = 'url_punctuation' + else: + a = 'url_query_value' + else: + if c == '=': + in_val = True + a = 'url_punctuation' + else: + a = 'url_query_key' + elif i > i_ext: + a = 'url_extension' + elif i > i_last_slash: + a = 'url_filename' + else: + a = 'text' + urwid.util.rle_append_modify(attr, (a, len(c.encode()))) + return attr + + +def colorize_url(url): + parts = url.split('/', 3) + if len(parts) < 4 or len(parts[1]) > 0 or parts[0][-1:] != ':': + return [('error', len(url))] # bad URL + schemes = { + 'http:': 'scheme_http', + 'https:': 'scheme_https', + } + return [ + (schemes.get(parts[0], "scheme_other"), len(parts[0]) - 1), + ('url_punctuation', 3), # :// + ] + colorize_host(parts[2]) + colorize_req('/' + parts[3]) @lru_cache(maxsize=800) @@ -110,50 +272,71 @@ def raw_format_flow(f): f = dict(f) pile = [] req = [] - if f["extended"]: - req.append( - fcol( - human.format_timestamp(f["req_timestamp"]), - "highlight" - ) - ) - else: - req.append(fcol(">>" if f["focus"] else " ", "focus")) - if f["marked"]: - req.append(fcol(SYMBOL_MARK, "mark")) + cursor = [' ', 'focus'] + if f.get('resp_is_replay', False): + cursor[0] = SYMBOL_REPLAY + cursor[1] = 'replay' + if f['marked']: + if cursor[0] == ' ': + cursor[0] = SYMBOL_MARK + cursor[1] = 'mark' + if f['focus']: + cursor[0] = '>' - if f["req_is_replay"]: - req.append(fcol(SYMBOL_REPLAY, "replay")) + req.append(fcol(*cursor)) - req.append(fcol(f["req_method"], "method")) + if f["two_line"]: + req.append(TruncatedText(f["req_url"], colorize_url(f["req_url"]), 'left')) + pile.append(urwid.Columns(req, dividechars=1)) - preamble = sum(i[1] for i in req) + len(req) - 1 + req = [] + req.append(fcol(' ', 'text')) if f["intercepted"] and not f["acked"]: uc = "intercept" - elif "resp_code" in f or "err_msg" in f: - uc = "text" + elif "resp_code" in f or f["err_msg"] is not None: + uc = "highlight" else: uc = "title" - url = f["req_url"] + if f["extended"]: + s = human.format_timestamp(f["req_timestamp"]) + else: + s = datetime.datetime.fromtimestamp(time.mktime(time.localtime(f["req_timestamp"]))).strftime("%H:%M:%S") + req.append(fcol(s, uc)) - if f["max_url_len"] and len(url) > f["max_url_len"]: - url = url[:f["max_url_len"]] + "…" + methods = { + 'GET': 'method_get', + 'POST': 'method_post', + } + uc = methods.get(f["req_method"], "method_other") + if f['extended']: + req.append(fcol(f["req_method"], uc)) + if f["req_promise"]: + req.append(fcol('PUSH_PROMISE', 'method_http2_push')) + else: + if f["req_promise"]: + uc = 'method_http2_push' + req.append(("fixed", 4, truncated_plain(f["req_method"], uc))) - if f["req_http_version"] not in ("HTTP/1.0", "HTTP/1.1"): - url += " " + f["req_http_version"] - req.append( - urwid.Text([(uc, url)]) - ) + if f["two_line"]: + req.append(fcol(f["req_http_version"], 'text')) + else: + schemes = { + 'http': 'scheme_http', + 'https': 'scheme_https', + } + req.append(fcol(fixlen(f["req_scheme"].upper(), 5), schemes.get(f["req_scheme"], "scheme_other"))) - pile.append(urwid.Columns(req, dividechars=1)) + req.append(('weight', 0.25, TruncatedText(f["req_host"], colorize_host(f["req_host"]), 'right'))) + req.append(('weight', 1.0, TruncatedText(f["req_path"], colorize_req(f["req_path"]), 'left'))) - resp = [] - resp.append( - ("fixed", preamble, urwid.Text("")) - ) + ret = (' ' * len(SYMBOL_RETURN), 'text') + status = ('', 'text') + content = ('', 'text') + size = ('', 'text') + duration = ('', 'text') if "resp_code" in f: codes = { @@ -163,79 +346,112 @@ def raw_format_flow(f): 5: "code_500", } ccol = codes.get(f["resp_code"] // 100, "code_other") - resp.append(fcol(SYMBOL_RETURN, ccol)) - if f["resp_is_replay"]: - resp.append(fcol(SYMBOL_REPLAY, "replay")) - resp.append(fcol(f["resp_code"], ccol)) - if f["extended"]: - resp.append(fcol(f["resp_reason"], ccol)) - if f["intercepted"] and f["resp_code"] and not f["acked"]: - rc = "intercept" - else: - rc = "text" + ret = (SYMBOL_RETURN, ccol) + status = (str(f["resp_code"]), ccol) - if f["resp_ctype"]: - resp.append(fcol(f["resp_ctype"], rc)) - resp.append(fcol(f["resp_clen"], rc)) - resp.append(fcol(f["roundtrip"], rc)) + if f["resp_len"] < 0: + if f["intercepted"] and f["resp_code"] and not f["acked"]: + rc = "intercept" + else: + rc = "content_none" + + if f["resp_len"] == -1: + contentdesc = "[content missing]" + else: + contentdesc = "[no content]" + content = (contentdesc, rc) + else: + if f["resp_ctype"]: + ctype = f["resp_ctype"].split(";")[0] + if ctype.endswith('/javascript'): + rc = 'content_script' + elif ctype.startswith('text/'): + rc = 'content_text' + elif (ctype.startswith('image/') or + ctype.startswith('video/') or + ctype.startswith('font/') or + "/x-font-" in ctype): + rc = 'content_media' + elif ctype.endswith('/json') or ctype.endswith('/xml'): + rc = 'content_data' + elif ctype.startswith('application/'): + rc = 'content_raw' + else: + rc = 'content_other' + content = (ctype, rc) + + rc = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + f["resp_len"]) / 20, 0.99)) + + size_str = human.pretty_size(f["resp_len"]) + if not f['extended']: + # shorten to 5 chars max + if len(size_str) > 5: + size_str = size_str[0:4].rstrip('.') + size_str[-1:] + size = (size_str, rc) + + if f['duration'] is not None: + rc = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + 1000 * f['duration']) / 12, 0.99)) + duration = (human.pretty_duration(f['duration']), rc) elif f["err_msg"]: - resp.append(fcol(SYMBOL_RETURN, "error")) - resp.append( - urwid.Text([ - ( - "error", - f["err_msg"] - ) - ]) - ) - pile.append(urwid.Columns(resp, dividechars=1)) + status = ('Err', 'error') + content = f["err_msg"], 'error' + + if f["two_line"]: + req.append(fcol(*ret)) + req.append(fcol(fixlen(status[0], 3), status[1])) + req.append(('weight', 0.15, truncated_plain(content[0], content[1], 'right'))) + if f['extended']: + req.append(fcol(*size)) + else: + req.append(fcol(fixlen_r(size[0], 5), size[1])) + req.append(fcol(fixlen_r(duration[0], 5), duration[1])) + + pile.append(urwid.Columns(req, dividechars=1, min_width=15)) + return urwid.Pile(pile) -def format_flow(f, focus, extended=False, hostheader=False, max_url_len=False): +def format_flow(f, focus, extended=False, hostheader=False, cols=False): acked = False if f.reply and f.reply.state == "committed": acked = True - pushed = ' PUSH_PROMISE' if 'h2-pushed-stream' in f.metadata else '' d = dict( focus=focus, extended=extended, - max_url_len=max_url_len, + two_line=extended or cols < 100, intercepted=f.intercepted, acked=acked, req_timestamp=f.request.timestamp_start, req_is_replay=f.request.is_replay, - req_method=f.request.method + pushed, + req_method=f.request.method, + req_promise='h2-pushed-stream' in f.metadata, req_url=f.request.pretty_url if hostheader else f.request.url, + req_scheme=f.request.scheme, + req_host=f.request.pretty_host if hostheader else f.request.host, + req_path=f.request.path, req_http_version=f.request.http_version, err_msg=f.error.msg if f.error else None, marked=f.marked, ) if f.response: if f.response.raw_content: - contentdesc = human.pretty_size(len(f.response.raw_content)) + content_len = len(f.response.raw_content) elif f.response.raw_content is None: - contentdesc = "[content missing]" + content_len = -1 else: - contentdesc = "[no content]" - duration = 0 + content_len = -2 + duration = None if f.response.timestamp_end and f.request.timestamp_start: duration = f.response.timestamp_end - f.request.timestamp_start - roundtrip = human.pretty_duration(duration) d.update(dict( resp_code=f.response.status_code, resp_reason=f.response.reason, resp_is_replay=f.response.is_replay, - resp_clen=contentdesc, - roundtrip=roundtrip, + resp_len=content_len, + resp_ctype=f.response.headers.get("content-type"), + duration=duration, )) - t = f.response.headers.get("content-type") - if t: - d["resp_ctype"] = t.split(";")[0] - else: - d["resp_ctype"] = "" - return raw_format_flow(tuple(sorted(d.items()))) diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py index e947a5827..63e673270 100644 --- a/mitmproxy/tools/console/flowlist.py +++ b/mitmproxy/tools/console/flowlist.py @@ -18,7 +18,7 @@ class FlowItem(urwid.WidgetWrap): self.flow, self.flow is self.master.view.focus.flow, hostheader=self.master.options.showhost, - max_url_len=cols, + cols=cols, ) def selectable(self): diff --git a/mitmproxy/tools/console/flowview.py b/mitmproxy/tools/console/flowview.py index b4e3876f1..5466319ab 100644 --- a/mitmproxy/tools/console/flowview.py +++ b/mitmproxy/tools/console/flowview.py @@ -38,7 +38,7 @@ class FlowViewHeader(urwid.WidgetWrap): False, extended=True, hostheader=self.master.options.showhost, - max_url_len=cols, + cols=cols, ) else: self._w = urwid.Pile([]) diff --git a/mitmproxy/tools/console/palettes.py b/mitmproxy/tools/console/palettes.py index 7930c4a31..405f1a6c4 100644 --- a/mitmproxy/tools/console/palettes.py +++ b/mitmproxy/tools/console/palettes.py @@ -22,7 +22,12 @@ class Palette: 'option_selected_key', # List and Connections - 'method', 'focus', + 'method', + 'method_get', 'method_post', 'method_other', 'method_http2_push', + 'scheme_http', 'scheme_https', 'scheme_other', + 'url_punctuation', 'url_domain', 'url_filename', 'url_extension', 'url_query_key', 'url_query_value', + 'content_none', 'content_text', 'content_script', 'content_media', 'content_data', 'content_raw', 'content_other', + 'focus', 'code_200', 'code_300', 'code_400', 'code_500', 'code_other', 'error', "warn", "alert", 'header', 'highlight', 'intercept', 'replay', 'mark', @@ -36,6 +41,7 @@ class Palette: # Commander 'commander_command', 'commander_invalid', 'commander_hint' ] + _fields.extend(['gradient_%02d' % i for i in range(100)]) high: typing.Mapping[str, typing.Sequence[str]] = None def palette(self, transparent): @@ -68,6 +74,27 @@ class Palette: return l +def gen_gradient(palette, cols): + for i in range(100): + palette['gradient_%02d' % i] = (cols[i * len(cols) // 100], 'default') + + +def gen_rgb_gradient(palette, cols): + parts = len(cols) - 1 + for i in range(100): + p = i / 100 + idx = int(p * parts) + t0 = cols[idx] + t1 = cols[idx + 1] + pp = p * parts % 1 + t = ( + round(t0[0] + (t1[0] - t0[0]) * pp), + round(t0[1] + (t1[1] - t0[1]) * pp), + round(t0[2] + (t1[2] - t0[2]) * pp), + ) + palette['gradient_%02d' % i] = ("#%x%x%x" % t, 'default') + + class LowDark(Palette): """ @@ -95,6 +122,30 @@ class LowDark(Palette): # List and Connections method = ('dark cyan', 'default'), + method_get = ('dark cyan', 'default'), + method_post = ('dark red', 'default'), + method_other = ('dark magenta', 'default'), + method_http2_push = ('dark gray', 'default'), + + scheme_http = ('dark cyan', 'default'), + scheme_https = ('dark green', 'default'), + scheme_other = ('dark magenta', 'default'), + + url_punctuation = ('dark gray', 'default'), + url_domain = ('white', 'default'), + url_filename = ('dark cyan', 'default'), + url_extension = ('light gray', 'default'), + url_query_key = ('white', 'default'), + url_query_value = ('light gray', 'default'), + + content_none = ('dark gray', 'default'), + content_text = ('light gray', 'default'), + content_script = ('dark green', 'default'), + content_media = ('light blue', 'default'), + content_data = ('brown', 'default'), + content_raw = ('dark red', 'default'), + content_other = ('dark magenta', 'default'), + focus = ('yellow', 'default'), code_200 = ('dark green', 'default'), @@ -127,6 +178,7 @@ class LowDark(Palette): commander_invalid = ('light red', 'default'), commander_hint = ('dark gray', 'default'), ) + gen_gradient(low, ['light red', 'yellow', 'light green', 'dark green', 'dark cyan', 'dark blue']) class Dark(LowDark): @@ -312,8 +364,20 @@ class SolarizedDark(LowDark): # List and Connections method = (sol_cyan, 'default'), + method_http2_push = (sol_base01, 'default'), focus = (sol_base1, 'default'), + url_punctuation = ('h242', 'default'), + url_domain = ('h252', 'default'), + url_filename = ('h132', 'default'), + url_extension = ('h96', 'default'), + url_query_key = ('h37', 'default'), + url_query_value = ('h30', 'default'), + + content_none = (sol_base01, 'default'), + content_text = (sol_base1, 'default'), + content_media = (sol_blue, 'default'), + code_200 = (sol_green, 'default'), code_300 = (sol_blue, 'default'), code_400 = (sol_orange, 'default',), @@ -342,6 +406,7 @@ class SolarizedDark(LowDark): commander_invalid = (sol_orange, 'default'), commander_hint = (sol_base00, 'default'), ) + gen_rgb_gradient(high, [(15, 0, 0), (15, 15, 0), (0, 15, 0), (0, 15, 15), (0, 0, 15)]) DEFAULT = "dark"