From d33857588cc8351da17fed9ea2486b8b193bd8b8 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Sat, 11 Apr 2020 23:59:31 +0200 Subject: [PATCH] fixup flow rendering this was a convoluted mess before and a nightmare to maintain. the new implementation is a bit more verbose, but it can be type-checked for errors. --- mitmproxy/tools/console/common.py | 762 +++++++++++++---------- mitmproxy/tools/console/consoleaddons.py | 2 +- mitmproxy/tools/console/flowlist.py | 11 +- mitmproxy/tools/console/flowview.py | 5 +- mitmproxy/tools/console/palettes.py | 10 +- 5 files changed, 449 insertions(+), 341 deletions(-) diff --git a/mitmproxy/tools/console/common.py b/mitmproxy/tools/console/common.py index 9ed5795f0..ccfda1866 100644 --- a/mitmproxy/tools/console/common.py +++ b/mitmproxy/tools/console/common.py @@ -1,3 +1,4 @@ +import enum import platform import typing import datetime @@ -9,6 +10,8 @@ from publicsuffix2 import get_sld, get_tld import urwid import urwid.util +from mitmproxy import flow +from mitmproxy.http import HTTPFlow from mitmproxy.utils import human from mitmproxy.tcp import TCPFlow @@ -83,7 +86,7 @@ def format_keyvals( return ret -def fcol(s, attr): +def fcol(s: str, attr: str) -> typing.Tuple[str, int, urwid.Text]: s = str(s) return ( "fixed", @@ -106,20 +109,48 @@ if urwid.util.detected_encoding: else: SYMBOL_REPLAY = u"[r]" SYMBOL_RETURN = u"<-" - SYMBOL_MARK = "[m]" + SYMBOL_MARK = "#" SYMBOL_UP = "^" SYMBOL_DOWN = " " SYMBOL_ELLIPSIS = "~" +SCHEME_STYLES = { + 'http': 'scheme_http', + 'https': 'scheme_https', + 'tcp': 'scheme_tcp', +} +HTTP_REQUEST_METHOD_STYLES = { + 'GET': 'method_get', + 'POST': 'method_post', + 'DELETE': 'method_delete', + 'HEAD': 'method_head', + 'PUT': 'method_put' +} +HTTP_RESPONSE_CODE_STYLE = { + 2: "code_200", + 3: "code_300", + 4: "code_400", + 5: "code_500", +} -def fixlen(s, maxlen): + +class RenderMode(enum.Enum): + TABLE = 1 + """The flow list in table format, i.e. one row per flow.""" + LIST = 2 + """The flow list in list format, i.e. potentially multiple rows per flow.""" + DETAILVIEW = 3 + """The top lines in the detail view.""" + + +def fixlen(s: str, maxlen: int) -> str: if len(s) <= maxlen: return s.ljust(maxlen) else: return s[0:maxlen - len(SYMBOL_ELLIPSIS)] + SYMBOL_ELLIPSIS -def fixlen_r(s, maxlen): +def fixlen_r(s: str, maxlen: int) -> str: if len(s) <= maxlen: return s.rjust(maxlen) else: @@ -234,8 +265,8 @@ def colorize_req(s): for i in range(len(s)): c = s[i] if ((i < i_query and c == '/') or - (i < i_query and i > i_last_slash and c == '.') or - (i == i_query)): + (i < i_query and i > i_last_slash and c == '.') or + (i == i_query)): a = 'url_punctuation' elif i > i_query: if in_val: @@ -269,351 +300,428 @@ def colorize_url(url): 'https:': 'scheme_https', } return [ - (schemes.get(parts[0], "scheme_other"), len(parts[0]) - 1), - ('url_punctuation', 3), # :// - ] + colorize_host(parts[2]) + colorize_req('/' + parts[3]) + (schemes.get(parts[0], "scheme_other"), len(parts[0]) - 1), + ('url_punctuation', 3), # :// + ] + colorize_host(parts[2]) + colorize_req('/' + parts[3]) + + +def format_http_content_type(content_type: str) -> typing.Tuple[str, str]: + content_type = content_type.split(";")[0] + if content_type.endswith('/javascript'): + style = 'content_script' + elif content_type.startswith('text/'): + style = 'content_text' + elif (content_type.startswith('image/') or + content_type.startswith('video/') or + content_type.startswith('font/') or + "/x-font-" in content_type): + style = 'content_media' + elif content_type.endswith('/json') or content_type.endswith('/xml'): + style = 'content_data' + elif content_type.startswith('application/'): + style = 'content_raw' + else: + style = 'content_other' + return content_type, style + + +def format_duration(duration: float) -> typing.Tuple[str, str]: + pretty_duration = human.pretty_duration(duration) + style = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + 1000 * duration) / 12, 0.99)) + return pretty_duration, style + + +def format_size(num_bytes: int) -> typing.Tuple[str, str]: + pretty_size = human.pretty_size(num_bytes) + style = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + num_bytes) / 20, 0.99)) + return pretty_size, style + + +def format_left_indicators( + *, + focused: bool, + intercepted: bool, + timestamp: float +): + indicators = [] + if focused: + indicators.append(("focus", ">>")) + else: + indicators.append(" ") + pretty_timestamp = human.format_timestamp(timestamp)[-8:] + if intercepted: + indicators.append(("intercept", pretty_timestamp)) + else: + indicators.append(("text", pretty_timestamp)) + return "fixed", 10, urwid.Text(indicators) + + +def format_right_indicators( + *, + replay: bool, + marked: bool +): + indicators = [] + if replay: + indicators.append(("replay", SYMBOL_REPLAY)) + else: + indicators.append(" ") + if marked: + indicators.append(("mark", SYMBOL_MARK)) + else: + indicators.append(" ") + return "fixed", 2, urwid.Text(indicators) @lru_cache(maxsize=800) -def raw_format_list(f): - f = dict(f) - pile = [] +def format_http_flow_list( + *, + render_mode: RenderMode, + focused: bool, + marked: bool, + request_method: str, + request_scheme: str, + request_host: str, + request_path: str, + request_url: str, + request_http_version: str, + request_timestamp: float, + request_is_push_promise: bool, + request_is_replay: bool, + intercepted: bool, + response_code: typing.Optional[int], + response_reason: typing.Optional[str], + response_content_length: typing.Optional[int], + response_content_type: typing.Optional[str], + response_is_replay: bool, + duration: typing.Optional[float], + error_message: typing.Optional[str], +) -> urwid.Widget: req = [] - if f["extended"]: + + if render_mode is RenderMode.DETAILVIEW: + req.append(fcol(human.format_timestamp(request_timestamp), "highlight")) + else: + if focused: + req.append(fcol(">>", "focus")) + else: + req.append(fcol(" ", "focus")) + + method_style = HTTP_REQUEST_METHOD_STYLES.get(request_method, "method_other") + req.append(fcol(request_method, method_style)) + + if request_is_push_promise: + req.append(fcol('PUSH_PROMISE', 'method_http2_push')) + + preamble_len = sum(x[1] for x in req) + len(req) - 1 + + if request_http_version not in ("HTTP/1.0", "HTTP/1.1"): + request_url += " " + request_http_version + if intercepted and not response_code: + url_style = "intercept" + elif response_code or error_message: + url_style = "text" + else: + url_style = "title" + + if render_mode is RenderMode.DETAILVIEW: req.append( - fcol( - human.format_timestamp(f["req_timestamp"]), - "highlight" - ) + urwid.Text([(url_style, request_url)]) ) else: - req.append(fcol(">>" if f["focus"] else " ", "focus")) + req.append(truncated_plain(request_url, url_style)) - if f["marked"]: - req.append(fcol(SYMBOL_MARK, "mark")) + req.append(format_right_indicators(replay=request_is_replay or response_is_replay, marked=marked)) - if f["req_is_replay"]: - req.append(fcol(SYMBOL_REPLAY, "replay")) + resp = [ + ("fixed", preamble_len, urwid.Text("")) + ] + if response_code: + if intercepted: + style = "intercept" + else: + style = None - req.append(fcol(f["req_method"], "method")) - - preamble = sum(i[1] for i in req) + len(req) - 1 - - if f["intercepted"] and not f["acked"]: - uc = "intercept" - elif "resp_code" in f or "err_msg" in f: - uc = "text" - else: - uc = "title" - - url = f["req_url"] - - if f["cols"] and len(url) > f["cols"]: - url = url[:f["cols"]] + "…" - - if f["req_http_version"] not in ("HTTP/1.0", "HTTP/1.1"): - url += " " + f["req_http_version"] - req.append( - urwid.Text([(uc, url)]) - ) - - pile.append(urwid.Columns(req, dividechars=1)) - - resp = [] - resp.append( - ("fixed", preamble, urwid.Text("")) - ) - - if "resp_code" in f: - codes = { - 2: "code_200", - 3: "code_300", - 4: "code_400", - 5: "code_500", - } - ccol = codes.get(f["resp_code"] // 100, "code_other") - resp.append(fcol(SYMBOL_RETURN, ccol)) - if f["resp_is_replay"]: + status_style = style or HTTP_RESPONSE_CODE_STYLE.get(response_code // 100, "code_other") + resp.append(fcol(SYMBOL_RETURN, status_style)) + if response_is_replay: resp.append(fcol(SYMBOL_REPLAY, "replay")) - resp.append(fcol(f["resp_code"], ccol)) - if f["extended"]: - resp.append(fcol(f["resp_reason"], ccol)) - if f["intercepted"] and f["resp_code"] and not f["acked"]: - rc = "intercept" + resp.append(fcol(str(response_code), status_style)) + if render_mode is RenderMode.DETAILVIEW: + resp.append(fcol(response_reason, status_style)) + + if response_content_type: + ct, ct_style = format_http_content_type(response_content_type) + resp.append(fcol(ct, style or ct_style)) + + if response_content_length: + size, size_style = format_size(response_content_length) + elif response_content_length == 0: + size = "[no content]" + size_style = "text" else: - rc = "text" + size = "[content missing]" + size_style = "text" + resp.append(fcol(size, style or size_style)) - if f["resp_ctype"]: - resp.append(fcol(f["resp_ctype"], rc)) - resp.append(fcol(f["resp_clen"], rc)) - pretty_duration = human.pretty_duration(f["duration"]) - resp.append(fcol(pretty_duration, rc)) - - elif f["err_msg"]: + if duration: + dur, dur_style = format_duration(duration) + resp.append(fcol(dur, style or dur_style)) + elif error_message: resp.append(fcol(SYMBOL_RETURN, "error")) - resp.append( - urwid.Text([ - ( - "error", - f["err_msg"] - ) - ]) + resp.append(urwid.Text([("error", error_message)])) + + return urwid.Pile([ + urwid.Columns(req, dividechars=1), + urwid.Columns(resp, dividechars=1) + ]) + + +@lru_cache(maxsize=800) +def format_http_flow_table( + *, + render_mode: RenderMode, + focused: bool, + marked: bool, + request_method: str, + request_scheme: str, + request_host: str, + request_path: str, + request_url: str, + request_http_version: str, + request_timestamp: float, + request_is_push_promise: bool, + request_is_replay: bool, + intercepted: bool, + response_code: typing.Optional[int], + response_reason: typing.Optional[str], + response_content_length: typing.Optional[int], + response_content_type: typing.Optional[str], + response_is_replay: bool, + duration: typing.Optional[float], + error_message: typing.Optional[str], +) -> urwid.Widget: + items = [ + format_left_indicators( + focused=focused, + intercepted=intercepted, + timestamp=request_timestamp ) - pile.append(urwid.Columns(resp, dividechars=1)) - return urwid.Pile(pile) - - -@lru_cache(maxsize=800) -def raw_format_table(f): - f = dict(f) - pile = [] - req = [] - - cursor = [' ', 'focus'] - if f['focus']: - cursor[0] = '>' - req.append(fcol(*cursor)) - - if f.get('resp_is_replay', False) or f.get('req_is_replay', False): - req.append(fcol(SYMBOL_REPLAY, 'replay')) - if f['marked']: - req.append(fcol(SYMBOL_MARK, 'mark')) - - if f["two_line"]: - req.append(TruncatedText(f["req_url"], colorize_url(f["req_url"]), 'left')) - pile.append(urwid.Columns(req, dividechars=1)) - - req = [] - req.append(fcol(' ', 'text')) - - if f["intercepted"] and not f["acked"]: - uc = "intercept" - elif "resp_code" in f or f["err_msg"] is not None: - uc = "highlight" - else: - uc = "title" - - if f["extended"]: - s = human.format_timestamp(f["req_timestamp"]) - else: - s = datetime.datetime.fromtimestamp(time.mktime(time.localtime(f["req_timestamp"]))).strftime("%H:%M:%S") - req.append(fcol(s, uc)) - - methods = { - 'GET': 'method_get', - 'POST': 'method_post', - 'DELETE': 'method_delete', - 'HEAD': 'method_head', - 'PUT': 'method_put' - } - uc = methods.get(f["req_method"], "method_other") - if f['extended']: - req.append(fcol(f["req_method"], uc)) - if f["req_promise"]: - req.append(fcol('PUSH_PROMISE', 'method_http2_push')) - else: - if f["req_promise"]: - uc = 'method_http2_push' - req.append(("fixed", 4, truncated_plain(f["req_method"], uc))) - - if f["two_line"]: - req.append(fcol(f["req_http_version"], 'text')) - else: - schemes = { - 'http': 'scheme_http', - 'https': 'scheme_https', - } - req.append(fcol(fixlen(f["req_scheme"].upper(), 5), schemes.get(f["req_scheme"], "scheme_other"))) - - req.append(('weight', 0.25, TruncatedText(f["req_host"], colorize_host(f["req_host"]), 'right'))) - req.append(('weight', 1.0, TruncatedText(f["req_path"], colorize_req(f["req_path"]), 'left'))) - - ret = (' ' * len(SYMBOL_RETURN), 'text') - status = ('', 'text') - content = ('', 'text') - size = ('', 'text') - duration = ('', 'text') - - if "resp_code" in f: - codes = { - 2: "code_200", - 3: "code_300", - 4: "code_400", - 5: "code_500", - } - ccol = codes.get(f["resp_code"] // 100, "code_other") - ret = (SYMBOL_RETURN, ccol) - status = (str(f["resp_code"]), ccol) - - if f["resp_len"] < 0: - if f["intercepted"] and f["resp_code"] and not f["acked"]: - rc = "intercept" - else: - rc = "content_none" - - if f["resp_len"] == -1: - contentdesc = "[content missing]" - else: - contentdesc = "[no content]" - content = (contentdesc, rc) - else: - if f["resp_ctype"]: - ctype = f["resp_ctype"].split(";")[0] - if ctype.endswith('/javascript'): - rc = 'content_script' - elif ctype.startswith('text/'): - rc = 'content_text' - elif (ctype.startswith('image/') or - ctype.startswith('video/') or - ctype.startswith('font/') or - "/x-font-" in ctype): - rc = 'content_media' - elif ctype.endswith('/json') or ctype.endswith('/xml'): - rc = 'content_data' - elif ctype.startswith('application/'): - rc = 'content_raw' - else: - rc = 'content_other' - content = (ctype, rc) - - rc = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + f["resp_len"]) / 20, 0.99)) - - size_str = human.pretty_size(f["resp_len"]) - if not f['extended']: - # shorten to 5 chars max - if len(size_str) > 5: - size_str = size_str[0:4].rstrip('.') + size_str[-1:] - size = (size_str, rc) - - if f['duration'] is not None: - rc = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + 1000 * f['duration']) / 12, 0.99)) - duration = (human.pretty_duration(f['duration']), rc) - - elif f["err_msg"]: - status = ('Err', 'error') - content = f["err_msg"], 'error' - - if f["two_line"]: - req.append(fcol(*ret)) - req.append(fcol(fixlen(status[0], 3), status[1])) - req.append(('weight', 0.15, truncated_plain(content[0], content[1], 'right'))) - if f['extended']: - req.append(fcol(*size)) - else: - req.append(fcol(fixlen_r(size[0], 5), size[1])) - req.append(fcol(fixlen_r(duration[0], 5), duration[1])) - - pile.append(urwid.Columns(req, dividechars=1, min_width=15)) - - return urwid.Pile(pile) - - -# TODO: this function can replace repeated code in raw_format_table() in the future -def raw_format_cursor(f): - cursor = [" ", "focus"] - if f["focus"]: - cursor[0] = ">" - return fcol(*cursor) - - -# TODO: this function can replace repeated code in raw_format_table() in the future -def raw_format_timestamp(timestamp, extended): - if extended: - s = human.format_timestamp(timestamp) - else: - s = datetime.datetime.fromtimestamp(time.mktime(time.localtime(timestamp))).strftime("%H:%M:%S") - return fcol(s, "title") - - -@lru_cache(maxsize=800) -def raw_format_tcp_table(f): - # If you'll remove this line TypeError: unhashable type: 'dict' will occur - # because of @lru_cache - f = dict(f) - - pile = [] - - columns = [ - raw_format_cursor(f), - raw_format_timestamp(f["timestamp_start"], f["extended"]), - fcol("TCP", "tcp"), - fcol(f["client"], "client"), - fcol("---", "direction"), - fcol(f["server"], "server"), ] - m = [c for c in columns] + if intercepted and not response_code: + request_style = "intercept" + else: + request_style = None - pile.append(urwid.Columns(m, dividechars=1)) + scheme_style = request_style or SCHEME_STYLES.get(request_scheme, "scheme_other") + items.append(fcol(fixlen(request_scheme.upper(), 5), scheme_style)) - return urwid.Pile(pile) + if request_is_push_promise: + method_style = 'method_http2_push' + else: + method_style = request_style or HTTP_REQUEST_METHOD_STYLES.get(request_method, "method_other") + items.append(fcol(fixlen(request_method, 4), method_style)) + + items.append(('weight', 0.25, TruncatedText(request_host, colorize_host(request_host), 'right'))) + items.append(('weight', 1.0, TruncatedText(request_path, colorize_req(request_path), 'left'))) + + if intercepted and response_code: + response_style = "intercept" + else: + response_style = None + + if response_code: + + status = str(response_code) + status_style = response_style or HTTP_RESPONSE_CODE_STYLE.get(response_code // 100, "code_other") + + if response_content_length and response_content_type: + content, content_style = format_http_content_type(response_content_type) + content_style = response_style or content_style + elif response_content_length: + content = '' + content_style = 'content_none' + elif response_content_length == 0: + content = "[no content]" + content_style = 'content_none' + else: + content = "[content missing]" + content_style = 'content_none' + + elif error_message: + status = 'err' + status_style = 'error' + content = error_message + content_style = 'error' + + else: + status = '' + status_style = 'text' + content = '' + content_style = '' + + items.append(fcol(fixlen(status, 3), status_style)) + items.append(('weight', 0.15, truncated_plain(content, content_style, 'right'))) + + if response_content_length: + size, size_style = format_size(response_content_length) + items.append(fcol(fixlen_r(size, 5), response_style or size_style)) + else: + items.append(("fixed", 5, urwid.Text(""))) + + if duration: + duration_pretty, duration_style = format_duration(duration) + items.append(fcol(fixlen_r(duration_pretty, 5), response_style or duration_style)) + else: + items.append(("fixed", 5, urwid.Text(""))) + + items.append(format_right_indicators( + replay=request_is_replay or response_is_replay, + marked=marked + )) + return urwid.Columns(items, dividechars=1, min_width=15) -def format_flow(f, focus, extended=False, hostheader=False, cols=False, layout='default'): +@lru_cache(maxsize=800) +def raw_format_tcp_flow( + *, + render_mode: RenderMode, + focused: typing.Optional[bool], + timestamp_start: float, + marked: bool, + client_address, + server_address, + total_size: int, + duration: typing.Optional[float], +): + conn = f"{human.format_address(client_address)} <-> {human.format_address(server_address)}" + + items = [] + + if render_mode in (RenderMode.TABLE, RenderMode.DETAILVIEW): + items.append( + format_left_indicators(focused=focused, intercepted=False, timestamp=timestamp_start) + ) + else: + if focused: + items.append(fcol(">>", "focus")) + else: + items.append(fcol(" ", "focus")) + + if render_mode is RenderMode.TABLE: + items.append(fcol("TCP ", SCHEME_STYLES["tcp"])) + else: + items.append(fcol("TCP", SCHEME_STYLES["tcp"])) + + items.append(('weight', 1.0, truncated_plain(conn, "text", 'left'))) + + if total_size: + size, size_style = format_size(total_size) + items.append(fcol(fixlen_r(size, 5), size_style)) + else: + items.append(("fixed", 5, urwid.Text(""))) + + if duration: + duration_pretty, duration_style = format_duration(duration) + items.append(fcol(fixlen_r(duration_pretty, 5), duration_style)) + else: + items.append(("fixed", 5, urwid.Text(""))) + + items.append(format_right_indicators(replay=False, marked=marked)) + + return urwid.Pile([ + urwid.Columns(items, dividechars=1, min_width=15) + ]) + + +def format_flow( + f: flow.Flow, + *, + render_mode: RenderMode, + hostheader: bool = False, # pass options directly if we need more stuff from them + focused: bool = True, +) -> urwid.Widget: + """ + This functions calls the proper renderer depending on the flow type. + We also want to cache the renderer output, so we extract all attributes + relevant for display and call the render with only that. This assures that rows + are updated if the flow is changed. + """ if isinstance(f, TCPFlow): - d = dict( - focus=focus, - extended=extended, + total_size = 0 + for message in f.messages: + total_size += len(message.content) + if f.messages: + duration = f.messages[-1].timestamp - f.timestamp_start + else: + duration = None + return raw_format_tcp_flow( + render_mode=render_mode, + focused=focused, timestamp_start=f.timestamp_start, - client=human.format_address(f.client_conn.address), - server=human.format_address(f.server_conn.address), + marked=f.marked, + client_address=f.client_conn.address, + server_address=f.server_conn.address, + total_size=total_size, + duration=duration, + ) + elif isinstance(f, HTTPFlow): + intercepted = ( + f.intercepted and not (f.reply and f.reply.state == "committed") + ) + if f.response: + if f.response.raw_content is not None: + response_content_length = len(f.response.raw_content) + else: + response_content_length = None + response_code = f.response.status_code + response_reason = f.response.reason + response_content_type = f.response.headers.get("content-type") + response_is_replay = f.response.is_replay + if f.response.timestamp_end: + duration = max([f.response.timestamp_end - f.request.timestamp_start, 0]) + else: + duration = None + else: + response_content_length = None + response_code = None + response_reason = None + response_content_type = None + response_is_replay = False + duration = None + if f.error: + error_message = f.error.msg + else: + error_message = None + + if render_mode in (RenderMode.LIST, RenderMode.DETAILVIEW): + render_func = format_http_flow_list + else: + render_func = format_http_flow_table + return render_func( + render_mode=render_mode, + focused=focused, + marked=f.marked, + request_method=f.request.method, + request_scheme=f.request.scheme, + request_host=f.request.pretty_host if hostheader else f.request.host, + request_path=f.request.path, + request_url=f.request.pretty_url if hostheader else f.request.url, + request_http_version=f.request.http_version, + request_timestamp=f.request.timestamp_start, + request_is_push_promise='h2-pushed-stream' in f.metadata, + request_is_replay=f.request.is_replay, + intercepted=intercepted, + response_code=response_code, + response_reason=response_reason, + response_content_length=response_content_length, + response_content_type=response_content_type, + response_is_replay=response_is_replay, + duration=duration, + error_message=error_message ) - # If you'll remove this line TypeError: unhashable type: 'dict' will occur - # because of @lru_cache. - t = tuple(sorted(d.items())) - - return raw_format_tcp_table(t) - - acked = False - if f.reply and f.reply.state == "committed": - acked = True - d = dict( - focus=focus, - extended=extended, - two_line=extended or cols < 100, - cols=cols, - intercepted=f.intercepted, - acked=acked, - req_timestamp=f.request.timestamp_start, - req_is_replay=f.request.is_replay, - req_method=f.request.method, - req_promise='h2-pushed-stream' in f.metadata, - req_url=f.request.pretty_url if hostheader else f.request.url, - req_scheme=f.request.scheme, - req_host=f.request.pretty_host if hostheader else f.request.host, - req_path=f.request.path, - req_http_version=f.request.http_version, - err_msg=f.error.msg if f.error else None, - marked=f.marked, - ) - if f.response: - if f.response.raw_content: - content_len = len(f.response.raw_content) - contentdesc = human.pretty_size(len(f.response.raw_content)) - elif f.response.raw_content is None: - content_len = -1 - contentdesc = "[content missing]" - else: - content_len = -2 - contentdesc = "[no content]" - - duration = None - if f.response.timestamp_end and f.request.timestamp_start: - duration = max([f.response.timestamp_end - f.request.timestamp_start, 0]) - - d.update(dict( - resp_code=f.response.status_code, - resp_reason=f.response.reason, - resp_is_replay=f.response.is_replay, - resp_len=content_len, - resp_ctype=f.response.headers.get("content-type"), - resp_clen=contentdesc, - duration=duration, - )) - - if ((layout == 'default' and cols < 100) or layout == "list"): - return raw_format_list(tuple(sorted(d.items()))) else: - return raw_format_table(tuple(sorted(d.items()))) + raise NotImplementedError() diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py index 905653e71..129d889fd 100644 --- a/mitmproxy/tools/console/consoleaddons.py +++ b/mitmproxy/tools/console/consoleaddons.py @@ -112,7 +112,7 @@ class ConsoleAddon: choices=sorted(console_palettes), ) loader.add_option( - "console_palette_transparent", bool, False, + "console_palette_transparent", bool, True, "Set transparent background for palette." ) loader.add_option( diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py index 9650c0d39..24d4c96b2 100644 --- a/mitmproxy/tools/console/flowlist.py +++ b/mitmproxy/tools/console/flowlist.py @@ -14,12 +14,17 @@ class FlowItem(urwid.WidgetWrap): def get_text(self): cols, _ = self.master.ui.get_cols_rows() + layout = self.master.options.console_flowlist_layout + if layout == "list" or (layout == 'default' and cols < 100): + render_mode = common.RenderMode.LIST + else: + render_mode = common.RenderMode.TABLE + return common.format_flow( self.flow, - self.flow is self.master.view.focus.flow, + render_mode=render_mode, + focused=self.flow is self.master.view.focus.flow, hostheader=self.master.options.showhost, - cols=cols, - layout=self.master.options.console_flowlist_layout ) def selectable(self): diff --git a/mitmproxy/tools/console/flowview.py b/mitmproxy/tools/console/flowview.py index 2fa1df1f2..60321e462 100644 --- a/mitmproxy/tools/console/flowview.py +++ b/mitmproxy/tools/console/flowview.py @@ -35,11 +35,8 @@ class FlowViewHeader(urwid.WidgetWrap): if self.master.view.focus.flow: self._w = common.format_flow( self.master.view.focus.flow, - False, - extended=True, + render_mode=common.RenderMode.DETAILVIEW, hostheader=self.master.options.showhost, - cols=cols, - layout=self.master.options.console_flowlist_layout ) else: self._w = urwid.Pile([]) diff --git a/mitmproxy/tools/console/palettes.py b/mitmproxy/tools/console/palettes.py index 6033ff25b..0a1dd8dfe 100644 --- a/mitmproxy/tools/console/palettes.py +++ b/mitmproxy/tools/console/palettes.py @@ -22,9 +22,8 @@ class Palette: 'option_selected_key', # List and Connections - 'method', 'method_get', 'method_post', 'method_delete', 'method_other', 'method_head', 'method_put', 'method_http2_push', - 'scheme_http', 'scheme_https', 'scheme_other', + 'scheme_http', 'scheme_https', 'scheme_tcp', 'scheme_other', 'url_punctuation', 'url_domain', 'url_filename', 'url_extension', 'url_query_key', 'url_query_value', 'content_none', 'content_text', 'content_script', 'content_media', 'content_data', 'content_raw', 'content_other', 'focus', @@ -121,7 +120,6 @@ class LowDark(Palette): option_active_selected = ('light red', 'light gray'), # List and Connections - method = ('dark cyan', 'default'), method_get = ('light green', 'default'), method_post = ('brown', 'default'), method_delete = ('light red', 'default'), @@ -132,6 +130,7 @@ class LowDark(Palette): scheme_http = ('dark cyan', 'default'), scheme_https = ('dark green', 'default'), + scheme_tcp=('dark magenta', 'default'), scheme_other = ('dark magenta', 'default'), url_punctuation = ('light gray', 'default'), @@ -221,7 +220,6 @@ class LowLight(Palette): option_active_selected = ('light red', 'light gray'), # List and Connections - method = ('dark cyan', 'default'), method_get = ('dark green', 'default'), method_post = ('brown', 'default'), method_head = ('dark cyan', 'default'), @@ -232,6 +230,7 @@ class LowLight(Palette): scheme_http = ('dark cyan', 'default'), scheme_https = ('light green', 'default'), + scheme_tcp=('light magenta', 'default'), scheme_other = ('light magenta', 'default'), url_punctuation = ('dark gray', 'default'), @@ -340,7 +339,6 @@ class SolarizedLight(LowLight): # List and Connections - method = ('dark cyan', 'default'), method_get = (sol_green, 'default'), method_post = (sol_orange, 'default'), method_head = (sol_cyan, 'default'), @@ -351,6 +349,7 @@ class SolarizedLight(LowLight): scheme_http = (sol_cyan, 'default'), scheme_https = ('light green', 'default'), + scheme_tcp=('light magenta', 'default'), scheme_other = ('light magenta', 'default'), url_punctuation = ('dark gray', 'default'), @@ -416,7 +415,6 @@ class SolarizedDark(LowDark): # List and Connections focus = (sol_base1, 'default'), - method = (sol_cyan, 'default'), method_get = (sol_green, 'default'), method_post = (sol_orange, 'default'), method_delete = (sol_red, 'default'),