fixup flow rendering

this was a convoluted mess before and a nightmare to maintain.
the new implementation is a bit more verbose, but it can be type-checked
for errors.
This commit is contained in:
Maximilian Hils 2020-04-11 23:59:31 +02:00
parent f96b41b6e6
commit d33857588c
5 changed files with 449 additions and 341 deletions

View File

@ -1,3 +1,4 @@
import enum
import platform
import typing
import datetime
@ -9,6 +10,8 @@ from publicsuffix2 import get_sld, get_tld
import urwid
import urwid.util
from mitmproxy import flow
from mitmproxy.http import HTTPFlow
from mitmproxy.utils import human
from mitmproxy.tcp import TCPFlow
@ -83,7 +86,7 @@ def format_keyvals(
return ret
def fcol(s, attr):
def fcol(s: str, attr: str) -> typing.Tuple[str, int, urwid.Text]:
s = str(s)
return (
"fixed",
@ -106,20 +109,48 @@ if urwid.util.detected_encoding:
else:
SYMBOL_REPLAY = u"[r]"
SYMBOL_RETURN = u"<-"
SYMBOL_MARK = "[m]"
SYMBOL_MARK = "#"
SYMBOL_UP = "^"
SYMBOL_DOWN = " "
SYMBOL_ELLIPSIS = "~"
SCHEME_STYLES = {
'http': 'scheme_http',
'https': 'scheme_https',
'tcp': 'scheme_tcp',
}
HTTP_REQUEST_METHOD_STYLES = {
'GET': 'method_get',
'POST': 'method_post',
'DELETE': 'method_delete',
'HEAD': 'method_head',
'PUT': 'method_put'
}
HTTP_RESPONSE_CODE_STYLE = {
2: "code_200",
3: "code_300",
4: "code_400",
5: "code_500",
}
def fixlen(s, maxlen):
class RenderMode(enum.Enum):
TABLE = 1
"""The flow list in table format, i.e. one row per flow."""
LIST = 2
"""The flow list in list format, i.e. potentially multiple rows per flow."""
DETAILVIEW = 3
"""The top lines in the detail view."""
def fixlen(s: str, maxlen: int) -> str:
if len(s) <= maxlen:
return s.ljust(maxlen)
else:
return s[0:maxlen - len(SYMBOL_ELLIPSIS)] + SYMBOL_ELLIPSIS
def fixlen_r(s, maxlen):
def fixlen_r(s: str, maxlen: int) -> str:
if len(s) <= maxlen:
return s.rjust(maxlen)
else:
@ -234,8 +265,8 @@ def colorize_req(s):
for i in range(len(s)):
c = s[i]
if ((i < i_query and c == '/') or
(i < i_query and i > i_last_slash and c == '.') or
(i == i_query)):
(i < i_query and i > i_last_slash and c == '.') or
(i == i_query)):
a = 'url_punctuation'
elif i > i_query:
if in_val:
@ -269,351 +300,428 @@ def colorize_url(url):
'https:': 'scheme_https',
}
return [
(schemes.get(parts[0], "scheme_other"), len(parts[0]) - 1),
('url_punctuation', 3), # ://
] + colorize_host(parts[2]) + colorize_req('/' + parts[3])
(schemes.get(parts[0], "scheme_other"), len(parts[0]) - 1),
('url_punctuation', 3), # ://
] + colorize_host(parts[2]) + colorize_req('/' + parts[3])
def format_http_content_type(content_type: str) -> typing.Tuple[str, str]:
content_type = content_type.split(";")[0]
if content_type.endswith('/javascript'):
style = 'content_script'
elif content_type.startswith('text/'):
style = 'content_text'
elif (content_type.startswith('image/') or
content_type.startswith('video/') or
content_type.startswith('font/') or
"/x-font-" in content_type):
style = 'content_media'
elif content_type.endswith('/json') or content_type.endswith('/xml'):
style = 'content_data'
elif content_type.startswith('application/'):
style = 'content_raw'
else:
style = 'content_other'
return content_type, style
def format_duration(duration: float) -> typing.Tuple[str, str]:
pretty_duration = human.pretty_duration(duration)
style = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + 1000 * duration) / 12, 0.99))
return pretty_duration, style
def format_size(num_bytes: int) -> typing.Tuple[str, str]:
pretty_size = human.pretty_size(num_bytes)
style = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + num_bytes) / 20, 0.99))
return pretty_size, style
def format_left_indicators(
*,
focused: bool,
intercepted: bool,
timestamp: float
):
indicators = []
if focused:
indicators.append(("focus", ">>"))
else:
indicators.append(" ")
pretty_timestamp = human.format_timestamp(timestamp)[-8:]
if intercepted:
indicators.append(("intercept", pretty_timestamp))
else:
indicators.append(("text", pretty_timestamp))
return "fixed", 10, urwid.Text(indicators)
def format_right_indicators(
*,
replay: bool,
marked: bool
):
indicators = []
if replay:
indicators.append(("replay", SYMBOL_REPLAY))
else:
indicators.append(" ")
if marked:
indicators.append(("mark", SYMBOL_MARK))
else:
indicators.append(" ")
return "fixed", 2, urwid.Text(indicators)
@lru_cache(maxsize=800)
def raw_format_list(f):
f = dict(f)
pile = []
def format_http_flow_list(
*,
render_mode: RenderMode,
focused: bool,
marked: bool,
request_method: str,
request_scheme: str,
request_host: str,
request_path: str,
request_url: str,
request_http_version: str,
request_timestamp: float,
request_is_push_promise: bool,
request_is_replay: bool,
intercepted: bool,
response_code: typing.Optional[int],
response_reason: typing.Optional[str],
response_content_length: typing.Optional[int],
response_content_type: typing.Optional[str],
response_is_replay: bool,
duration: typing.Optional[float],
error_message: typing.Optional[str],
) -> urwid.Widget:
req = []
if f["extended"]:
if render_mode is RenderMode.DETAILVIEW:
req.append(fcol(human.format_timestamp(request_timestamp), "highlight"))
else:
if focused:
req.append(fcol(">>", "focus"))
else:
req.append(fcol(" ", "focus"))
method_style = HTTP_REQUEST_METHOD_STYLES.get(request_method, "method_other")
req.append(fcol(request_method, method_style))
if request_is_push_promise:
req.append(fcol('PUSH_PROMISE', 'method_http2_push'))
preamble_len = sum(x[1] for x in req) + len(req) - 1
if request_http_version not in ("HTTP/1.0", "HTTP/1.1"):
request_url += " " + request_http_version
if intercepted and not response_code:
url_style = "intercept"
elif response_code or error_message:
url_style = "text"
else:
url_style = "title"
if render_mode is RenderMode.DETAILVIEW:
req.append(
fcol(
human.format_timestamp(f["req_timestamp"]),
"highlight"
)
urwid.Text([(url_style, request_url)])
)
else:
req.append(fcol(">>" if f["focus"] else " ", "focus"))
req.append(truncated_plain(request_url, url_style))
if f["marked"]:
req.append(fcol(SYMBOL_MARK, "mark"))
req.append(format_right_indicators(replay=request_is_replay or response_is_replay, marked=marked))
if f["req_is_replay"]:
req.append(fcol(SYMBOL_REPLAY, "replay"))
resp = [
("fixed", preamble_len, urwid.Text(""))
]
if response_code:
if intercepted:
style = "intercept"
else:
style = None
req.append(fcol(f["req_method"], "method"))
preamble = sum(i[1] for i in req) + len(req) - 1
if f["intercepted"] and not f["acked"]:
uc = "intercept"
elif "resp_code" in f or "err_msg" in f:
uc = "text"
else:
uc = "title"
url = f["req_url"]
if f["cols"] and len(url) > f["cols"]:
url = url[:f["cols"]] + ""
if f["req_http_version"] not in ("HTTP/1.0", "HTTP/1.1"):
url += " " + f["req_http_version"]
req.append(
urwid.Text([(uc, url)])
)
pile.append(urwid.Columns(req, dividechars=1))
resp = []
resp.append(
("fixed", preamble, urwid.Text(""))
)
if "resp_code" in f:
codes = {
2: "code_200",
3: "code_300",
4: "code_400",
5: "code_500",
}
ccol = codes.get(f["resp_code"] // 100, "code_other")
resp.append(fcol(SYMBOL_RETURN, ccol))
if f["resp_is_replay"]:
status_style = style or HTTP_RESPONSE_CODE_STYLE.get(response_code // 100, "code_other")
resp.append(fcol(SYMBOL_RETURN, status_style))
if response_is_replay:
resp.append(fcol(SYMBOL_REPLAY, "replay"))
resp.append(fcol(f["resp_code"], ccol))
if f["extended"]:
resp.append(fcol(f["resp_reason"], ccol))
if f["intercepted"] and f["resp_code"] and not f["acked"]:
rc = "intercept"
resp.append(fcol(str(response_code), status_style))
if render_mode is RenderMode.DETAILVIEW:
resp.append(fcol(response_reason, status_style))
if response_content_type:
ct, ct_style = format_http_content_type(response_content_type)
resp.append(fcol(ct, style or ct_style))
if response_content_length:
size, size_style = format_size(response_content_length)
elif response_content_length == 0:
size = "[no content]"
size_style = "text"
else:
rc = "text"
size = "[content missing]"
size_style = "text"
resp.append(fcol(size, style or size_style))
if f["resp_ctype"]:
resp.append(fcol(f["resp_ctype"], rc))
resp.append(fcol(f["resp_clen"], rc))
pretty_duration = human.pretty_duration(f["duration"])
resp.append(fcol(pretty_duration, rc))
elif f["err_msg"]:
if duration:
dur, dur_style = format_duration(duration)
resp.append(fcol(dur, style or dur_style))
elif error_message:
resp.append(fcol(SYMBOL_RETURN, "error"))
resp.append(
urwid.Text([
(
"error",
f["err_msg"]
)
])
resp.append(urwid.Text([("error", error_message)]))
return urwid.Pile([
urwid.Columns(req, dividechars=1),
urwid.Columns(resp, dividechars=1)
])
@lru_cache(maxsize=800)
def format_http_flow_table(
*,
render_mode: RenderMode,
focused: bool,
marked: bool,
request_method: str,
request_scheme: str,
request_host: str,
request_path: str,
request_url: str,
request_http_version: str,
request_timestamp: float,
request_is_push_promise: bool,
request_is_replay: bool,
intercepted: bool,
response_code: typing.Optional[int],
response_reason: typing.Optional[str],
response_content_length: typing.Optional[int],
response_content_type: typing.Optional[str],
response_is_replay: bool,
duration: typing.Optional[float],
error_message: typing.Optional[str],
) -> urwid.Widget:
items = [
format_left_indicators(
focused=focused,
intercepted=intercepted,
timestamp=request_timestamp
)
pile.append(urwid.Columns(resp, dividechars=1))
return urwid.Pile(pile)
@lru_cache(maxsize=800)
def raw_format_table(f):
f = dict(f)
pile = []
req = []
cursor = [' ', 'focus']
if f['focus']:
cursor[0] = '>'
req.append(fcol(*cursor))
if f.get('resp_is_replay', False) or f.get('req_is_replay', False):
req.append(fcol(SYMBOL_REPLAY, 'replay'))
if f['marked']:
req.append(fcol(SYMBOL_MARK, 'mark'))
if f["two_line"]:
req.append(TruncatedText(f["req_url"], colorize_url(f["req_url"]), 'left'))
pile.append(urwid.Columns(req, dividechars=1))
req = []
req.append(fcol(' ', 'text'))
if f["intercepted"] and not f["acked"]:
uc = "intercept"
elif "resp_code" in f or f["err_msg"] is not None:
uc = "highlight"
else:
uc = "title"
if f["extended"]:
s = human.format_timestamp(f["req_timestamp"])
else:
s = datetime.datetime.fromtimestamp(time.mktime(time.localtime(f["req_timestamp"]))).strftime("%H:%M:%S")
req.append(fcol(s, uc))
methods = {
'GET': 'method_get',
'POST': 'method_post',
'DELETE': 'method_delete',
'HEAD': 'method_head',
'PUT': 'method_put'
}
uc = methods.get(f["req_method"], "method_other")
if f['extended']:
req.append(fcol(f["req_method"], uc))
if f["req_promise"]:
req.append(fcol('PUSH_PROMISE', 'method_http2_push'))
else:
if f["req_promise"]:
uc = 'method_http2_push'
req.append(("fixed", 4, truncated_plain(f["req_method"], uc)))
if f["two_line"]:
req.append(fcol(f["req_http_version"], 'text'))
else:
schemes = {
'http': 'scheme_http',
'https': 'scheme_https',
}
req.append(fcol(fixlen(f["req_scheme"].upper(), 5), schemes.get(f["req_scheme"], "scheme_other")))
req.append(('weight', 0.25, TruncatedText(f["req_host"], colorize_host(f["req_host"]), 'right')))
req.append(('weight', 1.0, TruncatedText(f["req_path"], colorize_req(f["req_path"]), 'left')))
ret = (' ' * len(SYMBOL_RETURN), 'text')
status = ('', 'text')
content = ('', 'text')
size = ('', 'text')
duration = ('', 'text')
if "resp_code" in f:
codes = {
2: "code_200",
3: "code_300",
4: "code_400",
5: "code_500",
}
ccol = codes.get(f["resp_code"] // 100, "code_other")
ret = (SYMBOL_RETURN, ccol)
status = (str(f["resp_code"]), ccol)
if f["resp_len"] < 0:
if f["intercepted"] and f["resp_code"] and not f["acked"]:
rc = "intercept"
else:
rc = "content_none"
if f["resp_len"] == -1:
contentdesc = "[content missing]"
else:
contentdesc = "[no content]"
content = (contentdesc, rc)
else:
if f["resp_ctype"]:
ctype = f["resp_ctype"].split(";")[0]
if ctype.endswith('/javascript'):
rc = 'content_script'
elif ctype.startswith('text/'):
rc = 'content_text'
elif (ctype.startswith('image/') or
ctype.startswith('video/') or
ctype.startswith('font/') or
"/x-font-" in ctype):
rc = 'content_media'
elif ctype.endswith('/json') or ctype.endswith('/xml'):
rc = 'content_data'
elif ctype.startswith('application/'):
rc = 'content_raw'
else:
rc = 'content_other'
content = (ctype, rc)
rc = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + f["resp_len"]) / 20, 0.99))
size_str = human.pretty_size(f["resp_len"])
if not f['extended']:
# shorten to 5 chars max
if len(size_str) > 5:
size_str = size_str[0:4].rstrip('.') + size_str[-1:]
size = (size_str, rc)
if f['duration'] is not None:
rc = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + 1000 * f['duration']) / 12, 0.99))
duration = (human.pretty_duration(f['duration']), rc)
elif f["err_msg"]:
status = ('Err', 'error')
content = f["err_msg"], 'error'
if f["two_line"]:
req.append(fcol(*ret))
req.append(fcol(fixlen(status[0], 3), status[1]))
req.append(('weight', 0.15, truncated_plain(content[0], content[1], 'right')))
if f['extended']:
req.append(fcol(*size))
else:
req.append(fcol(fixlen_r(size[0], 5), size[1]))
req.append(fcol(fixlen_r(duration[0], 5), duration[1]))
pile.append(urwid.Columns(req, dividechars=1, min_width=15))
return urwid.Pile(pile)
# TODO: this function can replace repeated code in raw_format_table() in the future
def raw_format_cursor(f):
cursor = [" ", "focus"]
if f["focus"]:
cursor[0] = ">"
return fcol(*cursor)
# TODO: this function can replace repeated code in raw_format_table() in the future
def raw_format_timestamp(timestamp, extended):
if extended:
s = human.format_timestamp(timestamp)
else:
s = datetime.datetime.fromtimestamp(time.mktime(time.localtime(timestamp))).strftime("%H:%M:%S")
return fcol(s, "title")
@lru_cache(maxsize=800)
def raw_format_tcp_table(f):
# If you'll remove this line TypeError: unhashable type: 'dict' will occur
# because of @lru_cache
f = dict(f)
pile = []
columns = [
raw_format_cursor(f),
raw_format_timestamp(f["timestamp_start"], f["extended"]),
fcol("TCP", "tcp"),
fcol(f["client"], "client"),
fcol("---", "direction"),
fcol(f["server"], "server"),
]
m = [c for c in columns]
if intercepted and not response_code:
request_style = "intercept"
else:
request_style = None
pile.append(urwid.Columns(m, dividechars=1))
scheme_style = request_style or SCHEME_STYLES.get(request_scheme, "scheme_other")
items.append(fcol(fixlen(request_scheme.upper(), 5), scheme_style))
return urwid.Pile(pile)
if request_is_push_promise:
method_style = 'method_http2_push'
else:
method_style = request_style or HTTP_REQUEST_METHOD_STYLES.get(request_method, "method_other")
items.append(fcol(fixlen(request_method, 4), method_style))
items.append(('weight', 0.25, TruncatedText(request_host, colorize_host(request_host), 'right')))
items.append(('weight', 1.0, TruncatedText(request_path, colorize_req(request_path), 'left')))
if intercepted and response_code:
response_style = "intercept"
else:
response_style = None
if response_code:
status = str(response_code)
status_style = response_style or HTTP_RESPONSE_CODE_STYLE.get(response_code // 100, "code_other")
if response_content_length and response_content_type:
content, content_style = format_http_content_type(response_content_type)
content_style = response_style or content_style
elif response_content_length:
content = ''
content_style = 'content_none'
elif response_content_length == 0:
content = "[no content]"
content_style = 'content_none'
else:
content = "[content missing]"
content_style = 'content_none'
elif error_message:
status = 'err'
status_style = 'error'
content = error_message
content_style = 'error'
else:
status = ''
status_style = 'text'
content = ''
content_style = ''
items.append(fcol(fixlen(status, 3), status_style))
items.append(('weight', 0.15, truncated_plain(content, content_style, 'right')))
if response_content_length:
size, size_style = format_size(response_content_length)
items.append(fcol(fixlen_r(size, 5), response_style or size_style))
else:
items.append(("fixed", 5, urwid.Text("")))
if duration:
duration_pretty, duration_style = format_duration(duration)
items.append(fcol(fixlen_r(duration_pretty, 5), response_style or duration_style))
else:
items.append(("fixed", 5, urwid.Text("")))
items.append(format_right_indicators(
replay=request_is_replay or response_is_replay,
marked=marked
))
return urwid.Columns(items, dividechars=1, min_width=15)
def format_flow(f, focus, extended=False, hostheader=False, cols=False, layout='default'):
@lru_cache(maxsize=800)
def raw_format_tcp_flow(
*,
render_mode: RenderMode,
focused: typing.Optional[bool],
timestamp_start: float,
marked: bool,
client_address,
server_address,
total_size: int,
duration: typing.Optional[float],
):
conn = f"{human.format_address(client_address)} <-> {human.format_address(server_address)}"
items = []
if render_mode in (RenderMode.TABLE, RenderMode.DETAILVIEW):
items.append(
format_left_indicators(focused=focused, intercepted=False, timestamp=timestamp_start)
)
else:
if focused:
items.append(fcol(">>", "focus"))
else:
items.append(fcol(" ", "focus"))
if render_mode is RenderMode.TABLE:
items.append(fcol("TCP ", SCHEME_STYLES["tcp"]))
else:
items.append(fcol("TCP", SCHEME_STYLES["tcp"]))
items.append(('weight', 1.0, truncated_plain(conn, "text", 'left')))
if total_size:
size, size_style = format_size(total_size)
items.append(fcol(fixlen_r(size, 5), size_style))
else:
items.append(("fixed", 5, urwid.Text("")))
if duration:
duration_pretty, duration_style = format_duration(duration)
items.append(fcol(fixlen_r(duration_pretty, 5), duration_style))
else:
items.append(("fixed", 5, urwid.Text("")))
items.append(format_right_indicators(replay=False, marked=marked))
return urwid.Pile([
urwid.Columns(items, dividechars=1, min_width=15)
])
def format_flow(
f: flow.Flow,
*,
render_mode: RenderMode,
hostheader: bool = False, # pass options directly if we need more stuff from them
focused: bool = True,
) -> urwid.Widget:
"""
This functions calls the proper renderer depending on the flow type.
We also want to cache the renderer output, so we extract all attributes
relevant for display and call the render with only that. This assures that rows
are updated if the flow is changed.
"""
if isinstance(f, TCPFlow):
d = dict(
focus=focus,
extended=extended,
total_size = 0
for message in f.messages:
total_size += len(message.content)
if f.messages:
duration = f.messages[-1].timestamp - f.timestamp_start
else:
duration = None
return raw_format_tcp_flow(
render_mode=render_mode,
focused=focused,
timestamp_start=f.timestamp_start,
client=human.format_address(f.client_conn.address),
server=human.format_address(f.server_conn.address),
marked=f.marked,
client_address=f.client_conn.address,
server_address=f.server_conn.address,
total_size=total_size,
duration=duration,
)
elif isinstance(f, HTTPFlow):
intercepted = (
f.intercepted and not (f.reply and f.reply.state == "committed")
)
if f.response:
if f.response.raw_content is not None:
response_content_length = len(f.response.raw_content)
else:
response_content_length = None
response_code = f.response.status_code
response_reason = f.response.reason
response_content_type = f.response.headers.get("content-type")
response_is_replay = f.response.is_replay
if f.response.timestamp_end:
duration = max([f.response.timestamp_end - f.request.timestamp_start, 0])
else:
duration = None
else:
response_content_length = None
response_code = None
response_reason = None
response_content_type = None
response_is_replay = False
duration = None
if f.error:
error_message = f.error.msg
else:
error_message = None
if render_mode in (RenderMode.LIST, RenderMode.DETAILVIEW):
render_func = format_http_flow_list
else:
render_func = format_http_flow_table
return render_func(
render_mode=render_mode,
focused=focused,
marked=f.marked,
request_method=f.request.method,
request_scheme=f.request.scheme,
request_host=f.request.pretty_host if hostheader else f.request.host,
request_path=f.request.path,
request_url=f.request.pretty_url if hostheader else f.request.url,
request_http_version=f.request.http_version,
request_timestamp=f.request.timestamp_start,
request_is_push_promise='h2-pushed-stream' in f.metadata,
request_is_replay=f.request.is_replay,
intercepted=intercepted,
response_code=response_code,
response_reason=response_reason,
response_content_length=response_content_length,
response_content_type=response_content_type,
response_is_replay=response_is_replay,
duration=duration,
error_message=error_message
)
# If you'll remove this line TypeError: unhashable type: 'dict' will occur
# because of @lru_cache.
t = tuple(sorted(d.items()))
return raw_format_tcp_table(t)
acked = False
if f.reply and f.reply.state == "committed":
acked = True
d = dict(
focus=focus,
extended=extended,
two_line=extended or cols < 100,
cols=cols,
intercepted=f.intercepted,
acked=acked,
req_timestamp=f.request.timestamp_start,
req_is_replay=f.request.is_replay,
req_method=f.request.method,
req_promise='h2-pushed-stream' in f.metadata,
req_url=f.request.pretty_url if hostheader else f.request.url,
req_scheme=f.request.scheme,
req_host=f.request.pretty_host if hostheader else f.request.host,
req_path=f.request.path,
req_http_version=f.request.http_version,
err_msg=f.error.msg if f.error else None,
marked=f.marked,
)
if f.response:
if f.response.raw_content:
content_len = len(f.response.raw_content)
contentdesc = human.pretty_size(len(f.response.raw_content))
elif f.response.raw_content is None:
content_len = -1
contentdesc = "[content missing]"
else:
content_len = -2
contentdesc = "[no content]"
duration = None
if f.response.timestamp_end and f.request.timestamp_start:
duration = max([f.response.timestamp_end - f.request.timestamp_start, 0])
d.update(dict(
resp_code=f.response.status_code,
resp_reason=f.response.reason,
resp_is_replay=f.response.is_replay,
resp_len=content_len,
resp_ctype=f.response.headers.get("content-type"),
resp_clen=contentdesc,
duration=duration,
))
if ((layout == 'default' and cols < 100) or layout == "list"):
return raw_format_list(tuple(sorted(d.items())))
else:
return raw_format_table(tuple(sorted(d.items())))
raise NotImplementedError()

View File

@ -112,7 +112,7 @@ class ConsoleAddon:
choices=sorted(console_palettes),
)
loader.add_option(
"console_palette_transparent", bool, False,
"console_palette_transparent", bool, True,
"Set transparent background for palette."
)
loader.add_option(

View File

@ -14,12 +14,17 @@ class FlowItem(urwid.WidgetWrap):
def get_text(self):
cols, _ = self.master.ui.get_cols_rows()
layout = self.master.options.console_flowlist_layout
if layout == "list" or (layout == 'default' and cols < 100):
render_mode = common.RenderMode.LIST
else:
render_mode = common.RenderMode.TABLE
return common.format_flow(
self.flow,
self.flow is self.master.view.focus.flow,
render_mode=render_mode,
focused=self.flow is self.master.view.focus.flow,
hostheader=self.master.options.showhost,
cols=cols,
layout=self.master.options.console_flowlist_layout
)
def selectable(self):

View File

@ -35,11 +35,8 @@ class FlowViewHeader(urwid.WidgetWrap):
if self.master.view.focus.flow:
self._w = common.format_flow(
self.master.view.focus.flow,
False,
extended=True,
render_mode=common.RenderMode.DETAILVIEW,
hostheader=self.master.options.showhost,
cols=cols,
layout=self.master.options.console_flowlist_layout
)
else:
self._w = urwid.Pile([])

View File

@ -22,9 +22,8 @@ class Palette:
'option_selected_key',
# List and Connections
'method',
'method_get', 'method_post', 'method_delete', 'method_other', 'method_head', 'method_put', 'method_http2_push',
'scheme_http', 'scheme_https', 'scheme_other',
'scheme_http', 'scheme_https', 'scheme_tcp', 'scheme_other',
'url_punctuation', 'url_domain', 'url_filename', 'url_extension', 'url_query_key', 'url_query_value',
'content_none', 'content_text', 'content_script', 'content_media', 'content_data', 'content_raw', 'content_other',
'focus',
@ -121,7 +120,6 @@ class LowDark(Palette):
option_active_selected = ('light red', 'light gray'),
# List and Connections
method = ('dark cyan', 'default'),
method_get = ('light green', 'default'),
method_post = ('brown', 'default'),
method_delete = ('light red', 'default'),
@ -132,6 +130,7 @@ class LowDark(Palette):
scheme_http = ('dark cyan', 'default'),
scheme_https = ('dark green', 'default'),
scheme_tcp=('dark magenta', 'default'),
scheme_other = ('dark magenta', 'default'),
url_punctuation = ('light gray', 'default'),
@ -221,7 +220,6 @@ class LowLight(Palette):
option_active_selected = ('light red', 'light gray'),
# List and Connections
method = ('dark cyan', 'default'),
method_get = ('dark green', 'default'),
method_post = ('brown', 'default'),
method_head = ('dark cyan', 'default'),
@ -232,6 +230,7 @@ class LowLight(Palette):
scheme_http = ('dark cyan', 'default'),
scheme_https = ('light green', 'default'),
scheme_tcp=('light magenta', 'default'),
scheme_other = ('light magenta', 'default'),
url_punctuation = ('dark gray', 'default'),
@ -340,7 +339,6 @@ class SolarizedLight(LowLight):
# List and Connections
method = ('dark cyan', 'default'),
method_get = (sol_green, 'default'),
method_post = (sol_orange, 'default'),
method_head = (sol_cyan, 'default'),
@ -351,6 +349,7 @@ class SolarizedLight(LowLight):
scheme_http = (sol_cyan, 'default'),
scheme_https = ('light green', 'default'),
scheme_tcp=('light magenta', 'default'),
scheme_other = ('light magenta', 'default'),
url_punctuation = ('dark gray', 'default'),
@ -416,7 +415,6 @@ class SolarizedDark(LowDark):
# List and Connections
focus = (sol_base1, 'default'),
method = (sol_cyan, 'default'),
method_get = (sol_green, 'default'),
method_post = (sol_orange, 'default'),
method_delete = (sol_red, 'default'),