Tabular list flow

This commit is contained in:
BkPHcgQL3V 2018-12-19 17:52:32 +00:00 committed by Jesson Soto Ventura
parent 0b0b4ccba6
commit 998d150c1e
4 changed files with 355 additions and 74 deletions

View File

@ -1,5 +1,8 @@
import platform import platform
import typing import typing
import datetime
import time
import math
from functools import lru_cache from functools import lru_cache
import urwid import urwid
@ -97,12 +100,171 @@ if urwid.util.detected_encoding and not IS_WSL:
SYMBOL_MARK = u"\u25cf" SYMBOL_MARK = u"\u25cf"
SYMBOL_UP = u"\u21E7" SYMBOL_UP = u"\u21E7"
SYMBOL_DOWN = u"\u21E9" SYMBOL_DOWN = u"\u21E9"
SYMBOL_ELLIPSIS = u"\u2026"
else: else:
SYMBOL_REPLAY = u"[r]" SYMBOL_REPLAY = u"[r]"
SYMBOL_RETURN = u"<-" SYMBOL_RETURN = u"<-"
SYMBOL_MARK = "[m]" SYMBOL_MARK = "[m]"
SYMBOL_UP = "^" SYMBOL_UP = "^"
SYMBOL_DOWN = " " SYMBOL_DOWN = " "
SYMBOL_ELLIPSIS = "~"
def fixlen(s, maxlen):
if len(s) <= maxlen:
return s.ljust(maxlen)
else:
return s[0:maxlen - len(SYMBOL_ELLIPSIS)] + SYMBOL_ELLIPSIS
def fixlen_r(s, maxlen):
if len(s) <= maxlen:
return s.rjust(maxlen)
else:
return SYMBOL_ELLIPSIS + s[len(s) - maxlen + len(SYMBOL_ELLIPSIS):]
class TruncatedText(urwid.Widget):
def __init__(self, text, attr, align='left'):
self.text = text
self.attr = attr
self.align = align
super(TruncatedText, self).__init__()
def pack(self, size, focus=False):
return (len(self.text), 1)
def rows(self, size, focus=False):
return 1
def render(self, size, focus=False):
text = self.text
attr = self.attr
if self.align == 'right':
text = text[::-1]
attr = attr[::-1]
text_len = len(text) # TODO: unicode?
if size is not None and len(size) > 0:
width = size[0]
else:
width = text_len
if width >= text_len:
remaining = width - text_len
if remaining > 0:
c_text = text + ' ' * remaining
c_attr = attr + [('text', remaining)]
else:
c_text = text
c_attr = attr
else:
visible_len = width - len(SYMBOL_ELLIPSIS)
visible_text = text[0:visible_len]
c_text = visible_text + SYMBOL_ELLIPSIS
c_attr = (urwid.util.rle_subseg(attr, 0, len(visible_text.encode())) +
[('focus', len(SYMBOL_ELLIPSIS.encode()))])
if self.align == 'right':
c_text = c_text[::-1]
c_attr = c_attr[::-1]
return urwid.TextCanvas([c_text.encode()], [c_attr], maxcol=width)
def truncated_plain(text, attr, align='left'):
return TruncatedText(text, [(attr, len(text.encode()))], align)
# Work around https://github.com/urwid/urwid/pull/330
def rle_append_beginning_modify(rle, a_r):
"""
Append (a, r) (unpacked from *a_r*) to BEGINNING of rle.
Merge with first run when possible
MODIFIES rle parameter contents. Returns None.
"""
a, r = a_r
if not rle:
rle[:] = [(a, r)]
else:
al, run = rle[0]
if a == al:
rle[0] = (a, run + r)
else:
rle[0:0] = [(a, r)]
def colorize_host(s):
if len(s) == 0 or s[0] == '[' or s.split('.')[-1].isdigit():
main_part = -1
else:
main_part = 1 # TODO: second-level domains (https://publicsuffix.org/list/)
part = 0
attr = []
for i in reversed(range(len(s))):
c = s[i]
if c == '.':
part += 1
if c in ".:[]":
a = 'url_punctuation'
elif part == main_part:
a = 'url_domain'
else:
a = 'text'
rle_append_beginning_modify(attr, (a, len(c.encode())))
return attr
def colorize_req(s):
path = s.split('?', 2)[0]
i_query = len(path)
i_last_slash = path.rfind('/')
i_ext = path[i_last_slash + 1:].rfind('.')
i_ext = i_last_slash + i_ext if i_ext >= 0 else len(s)
in_val = False
attr = []
for i in range(len(s)):
c = s[i]
if ((i < i_query and c == '/') or
(i < i_query and i > i_last_slash and c == '.') or
(i == i_query)):
a = 'url_punctuation'
elif i > i_query:
if in_val:
if c == '&':
in_val = False
a = 'url_punctuation'
else:
a = 'url_query_value'
else:
if c == '=':
in_val = True
a = 'url_punctuation'
else:
a = 'url_query_key'
elif i > i_ext:
a = 'url_extension'
elif i > i_last_slash:
a = 'url_filename'
else:
a = 'text'
urwid.util.rle_append_modify(attr, (a, len(c.encode())))
return attr
def colorize_url(url):
parts = url.split('/', 3)
if len(parts) < 4 or len(parts[1]) > 0 or parts[0][-1:] != ':':
return [('error', len(url))] # bad URL
schemes = {
'http:': 'scheme_http',
'https:': 'scheme_https',
}
return [
(schemes.get(parts[0], "scheme_other"), len(parts[0]) - 1),
('url_punctuation', 3), # ://
] + colorize_host(parts[2]) + colorize_req('/' + parts[3])
@lru_cache(maxsize=800) @lru_cache(maxsize=800)
@ -110,50 +272,71 @@ def raw_format_flow(f):
f = dict(f) f = dict(f)
pile = [] pile = []
req = [] req = []
if f["extended"]:
req.append(
fcol(
human.format_timestamp(f["req_timestamp"]),
"highlight"
)
)
else:
req.append(fcol(">>" if f["focus"] else " ", "focus"))
if f["marked"]: cursor = [' ', 'focus']
req.append(fcol(SYMBOL_MARK, "mark")) if f.get('resp_is_replay', False):
cursor[0] = SYMBOL_REPLAY
cursor[1] = 'replay'
if f['marked']:
if cursor[0] == ' ':
cursor[0] = SYMBOL_MARK
cursor[1] = 'mark'
if f['focus']:
cursor[0] = '>'
if f["req_is_replay"]: req.append(fcol(*cursor))
req.append(fcol(SYMBOL_REPLAY, "replay"))
req.append(fcol(f["req_method"], "method")) if f["two_line"]:
req.append(TruncatedText(f["req_url"], colorize_url(f["req_url"]), 'left'))
pile.append(urwid.Columns(req, dividechars=1))
preamble = sum(i[1] for i in req) + len(req) - 1 req = []
req.append(fcol(' ', 'text'))
if f["intercepted"] and not f["acked"]: if f["intercepted"] and not f["acked"]:
uc = "intercept" uc = "intercept"
elif "resp_code" in f or "err_msg" in f: elif "resp_code" in f or f["err_msg"] is not None:
uc = "text" uc = "highlight"
else: else:
uc = "title" uc = "title"
url = f["req_url"] if f["extended"]:
s = human.format_timestamp(f["req_timestamp"])
else:
s = datetime.datetime.fromtimestamp(time.mktime(time.localtime(f["req_timestamp"]))).strftime("%H:%M:%S")
req.append(fcol(s, uc))
if f["max_url_len"] and len(url) > f["max_url_len"]: methods = {
url = url[:f["max_url_len"]] + "" 'GET': 'method_get',
'POST': 'method_post',
}
uc = methods.get(f["req_method"], "method_other")
if f['extended']:
req.append(fcol(f["req_method"], uc))
if f["req_promise"]:
req.append(fcol('PUSH_PROMISE', 'method_http2_push'))
else:
if f["req_promise"]:
uc = 'method_http2_push'
req.append(("fixed", 4, truncated_plain(f["req_method"], uc)))
if f["req_http_version"] not in ("HTTP/1.0", "HTTP/1.1"): if f["two_line"]:
url += " " + f["req_http_version"] req.append(fcol(f["req_http_version"], 'text'))
req.append( else:
urwid.Text([(uc, url)]) schemes = {
) 'http': 'scheme_http',
'https': 'scheme_https',
}
req.append(fcol(fixlen(f["req_scheme"].upper(), 5), schemes.get(f["req_scheme"], "scheme_other")))
pile.append(urwid.Columns(req, dividechars=1)) req.append(('weight', 0.25, TruncatedText(f["req_host"], colorize_host(f["req_host"]), 'right')))
req.append(('weight', 1.0, TruncatedText(f["req_path"], colorize_req(f["req_path"]), 'left')))
resp = [] ret = (' ' * len(SYMBOL_RETURN), 'text')
resp.append( status = ('', 'text')
("fixed", preamble, urwid.Text("")) content = ('', 'text')
) size = ('', 'text')
duration = ('', 'text')
if "resp_code" in f: if "resp_code" in f:
codes = { codes = {
@ -163,79 +346,112 @@ def raw_format_flow(f):
5: "code_500", 5: "code_500",
} }
ccol = codes.get(f["resp_code"] // 100, "code_other") ccol = codes.get(f["resp_code"] // 100, "code_other")
resp.append(fcol(SYMBOL_RETURN, ccol)) ret = (SYMBOL_RETURN, ccol)
if f["resp_is_replay"]: status = (str(f["resp_code"]), ccol)
resp.append(fcol(SYMBOL_REPLAY, "replay"))
resp.append(fcol(f["resp_code"], ccol)) if f["resp_len"] < 0:
if f["extended"]:
resp.append(fcol(f["resp_reason"], ccol))
if f["intercepted"] and f["resp_code"] and not f["acked"]: if f["intercepted"] and f["resp_code"] and not f["acked"]:
rc = "intercept" rc = "intercept"
else: else:
rc = "text" rc = "content_none"
if f["resp_len"] == -1:
contentdesc = "[content missing]"
else:
contentdesc = "[no content]"
content = (contentdesc, rc)
else:
if f["resp_ctype"]: if f["resp_ctype"]:
resp.append(fcol(f["resp_ctype"], rc)) ctype = f["resp_ctype"].split(";")[0]
resp.append(fcol(f["resp_clen"], rc)) if ctype.endswith('/javascript'):
resp.append(fcol(f["roundtrip"], rc)) rc = 'content_script'
elif ctype.startswith('text/'):
rc = 'content_text'
elif (ctype.startswith('image/') or
ctype.startswith('video/') or
ctype.startswith('font/') or
"/x-font-" in ctype):
rc = 'content_media'
elif ctype.endswith('/json') or ctype.endswith('/xml'):
rc = 'content_data'
elif ctype.startswith('application/'):
rc = 'content_raw'
else:
rc = 'content_other'
content = (ctype, rc)
rc = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + f["resp_len"]) / 20, 0.99))
size_str = human.pretty_size(f["resp_len"])
if not f['extended']:
# shorten to 5 chars max
if len(size_str) > 5:
size_str = size_str[0:4].rstrip('.') + size_str[-1:]
size = (size_str, rc)
if f['duration'] is not None:
rc = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + 1000 * f['duration']) / 12, 0.99))
duration = (human.pretty_duration(f['duration']), rc)
elif f["err_msg"]: elif f["err_msg"]:
resp.append(fcol(SYMBOL_RETURN, "error")) status = ('Err', 'error')
resp.append( content = f["err_msg"], 'error'
urwid.Text([
( if f["two_line"]:
"error", req.append(fcol(*ret))
f["err_msg"] req.append(fcol(fixlen(status[0], 3), status[1]))
) req.append(('weight', 0.15, truncated_plain(content[0], content[1], 'right')))
]) if f['extended']:
) req.append(fcol(*size))
pile.append(urwid.Columns(resp, dividechars=1)) else:
req.append(fcol(fixlen_r(size[0], 5), size[1]))
req.append(fcol(fixlen_r(duration[0], 5), duration[1]))
pile.append(urwid.Columns(req, dividechars=1, min_width=15))
return urwid.Pile(pile) return urwid.Pile(pile)
def format_flow(f, focus, extended=False, hostheader=False, max_url_len=False): def format_flow(f, focus, extended=False, hostheader=False, cols=False):
acked = False acked = False
if f.reply and f.reply.state == "committed": if f.reply and f.reply.state == "committed":
acked = True acked = True
pushed = ' PUSH_PROMISE' if 'h2-pushed-stream' in f.metadata else ''
d = dict( d = dict(
focus=focus, focus=focus,
extended=extended, extended=extended,
max_url_len=max_url_len, two_line=extended or cols < 100,
intercepted=f.intercepted, intercepted=f.intercepted,
acked=acked, acked=acked,
req_timestamp=f.request.timestamp_start, req_timestamp=f.request.timestamp_start,
req_is_replay=f.request.is_replay, req_is_replay=f.request.is_replay,
req_method=f.request.method + pushed, req_method=f.request.method,
req_promise='h2-pushed-stream' in f.metadata,
req_url=f.request.pretty_url if hostheader else f.request.url, req_url=f.request.pretty_url if hostheader else f.request.url,
req_scheme=f.request.scheme,
req_host=f.request.pretty_host if hostheader else f.request.host,
req_path=f.request.path,
req_http_version=f.request.http_version, req_http_version=f.request.http_version,
err_msg=f.error.msg if f.error else None, err_msg=f.error.msg if f.error else None,
marked=f.marked, marked=f.marked,
) )
if f.response: if f.response:
if f.response.raw_content: if f.response.raw_content:
contentdesc = human.pretty_size(len(f.response.raw_content)) content_len = len(f.response.raw_content)
elif f.response.raw_content is None: elif f.response.raw_content is None:
contentdesc = "[content missing]" content_len = -1
else: else:
contentdesc = "[no content]" content_len = -2
duration = 0 duration = None
if f.response.timestamp_end and f.request.timestamp_start: if f.response.timestamp_end and f.request.timestamp_start:
duration = f.response.timestamp_end - f.request.timestamp_start duration = f.response.timestamp_end - f.request.timestamp_start
roundtrip = human.pretty_duration(duration)
d.update(dict( d.update(dict(
resp_code=f.response.status_code, resp_code=f.response.status_code,
resp_reason=f.response.reason, resp_reason=f.response.reason,
resp_is_replay=f.response.is_replay, resp_is_replay=f.response.is_replay,
resp_clen=contentdesc, resp_len=content_len,
roundtrip=roundtrip, resp_ctype=f.response.headers.get("content-type"),
duration=duration,
)) ))
t = f.response.headers.get("content-type")
if t:
d["resp_ctype"] = t.split(";")[0]
else:
d["resp_ctype"] = ""
return raw_format_flow(tuple(sorted(d.items()))) return raw_format_flow(tuple(sorted(d.items())))

View File

@ -18,7 +18,7 @@ class FlowItem(urwid.WidgetWrap):
self.flow, self.flow,
self.flow is self.master.view.focus.flow, self.flow is self.master.view.focus.flow,
hostheader=self.master.options.showhost, hostheader=self.master.options.showhost,
max_url_len=cols, cols=cols,
) )
def selectable(self): def selectable(self):

View File

@ -38,7 +38,7 @@ class FlowViewHeader(urwid.WidgetWrap):
False, False,
extended=True, extended=True,
hostheader=self.master.options.showhost, hostheader=self.master.options.showhost,
max_url_len=cols, cols=cols,
) )
else: else:
self._w = urwid.Pile([]) self._w = urwid.Pile([])

View File

@ -22,7 +22,12 @@ class Palette:
'option_selected_key', 'option_selected_key',
# List and Connections # List and Connections
'method', 'focus', 'method',
'method_get', 'method_post', 'method_other', 'method_http2_push',
'scheme_http', 'scheme_https', 'scheme_other',
'url_punctuation', 'url_domain', 'url_filename', 'url_extension', 'url_query_key', 'url_query_value',
'content_none', 'content_text', 'content_script', 'content_media', 'content_data', 'content_raw', 'content_other',
'focus',
'code_200', 'code_300', 'code_400', 'code_500', 'code_other', 'code_200', 'code_300', 'code_400', 'code_500', 'code_other',
'error', "warn", "alert", 'error', "warn", "alert",
'header', 'highlight', 'intercept', 'replay', 'mark', 'header', 'highlight', 'intercept', 'replay', 'mark',
@ -36,6 +41,7 @@ class Palette:
# Commander # Commander
'commander_command', 'commander_invalid', 'commander_hint' 'commander_command', 'commander_invalid', 'commander_hint'
] ]
_fields.extend(['gradient_%02d' % i for i in range(100)])
high: typing.Mapping[str, typing.Sequence[str]] = None high: typing.Mapping[str, typing.Sequence[str]] = None
def palette(self, transparent): def palette(self, transparent):
@ -68,6 +74,27 @@ class Palette:
return l return l
def gen_gradient(palette, cols):
for i in range(100):
palette['gradient_%02d' % i] = (cols[i * len(cols) // 100], 'default')
def gen_rgb_gradient(palette, cols):
parts = len(cols) - 1
for i in range(100):
p = i / 100
idx = int(p * parts)
t0 = cols[idx]
t1 = cols[idx + 1]
pp = p * parts % 1
t = (
round(t0[0] + (t1[0] - t0[0]) * pp),
round(t0[1] + (t1[1] - t0[1]) * pp),
round(t0[2] + (t1[2] - t0[2]) * pp),
)
palette['gradient_%02d' % i] = ("#%x%x%x" % t, 'default')
class LowDark(Palette): class LowDark(Palette):
""" """
@ -95,6 +122,30 @@ class LowDark(Palette):
# List and Connections # List and Connections
method = ('dark cyan', 'default'), method = ('dark cyan', 'default'),
method_get = ('dark cyan', 'default'),
method_post = ('dark red', 'default'),
method_other = ('dark magenta', 'default'),
method_http2_push = ('dark gray', 'default'),
scheme_http = ('dark cyan', 'default'),
scheme_https = ('dark green', 'default'),
scheme_other = ('dark magenta', 'default'),
url_punctuation = ('dark gray', 'default'),
url_domain = ('white', 'default'),
url_filename = ('dark cyan', 'default'),
url_extension = ('light gray', 'default'),
url_query_key = ('white', 'default'),
url_query_value = ('light gray', 'default'),
content_none = ('dark gray', 'default'),
content_text = ('light gray', 'default'),
content_script = ('dark green', 'default'),
content_media = ('light blue', 'default'),
content_data = ('brown', 'default'),
content_raw = ('dark red', 'default'),
content_other = ('dark magenta', 'default'),
focus = ('yellow', 'default'), focus = ('yellow', 'default'),
code_200 = ('dark green', 'default'), code_200 = ('dark green', 'default'),
@ -127,6 +178,7 @@ class LowDark(Palette):
commander_invalid = ('light red', 'default'), commander_invalid = ('light red', 'default'),
commander_hint = ('dark gray', 'default'), commander_hint = ('dark gray', 'default'),
) )
gen_gradient(low, ['light red', 'yellow', 'light green', 'dark green', 'dark cyan', 'dark blue'])
class Dark(LowDark): class Dark(LowDark):
@ -312,8 +364,20 @@ class SolarizedDark(LowDark):
# List and Connections # List and Connections
method = (sol_cyan, 'default'), method = (sol_cyan, 'default'),
method_http2_push = (sol_base01, 'default'),
focus = (sol_base1, 'default'), focus = (sol_base1, 'default'),
url_punctuation = ('h242', 'default'),
url_domain = ('h252', 'default'),
url_filename = ('h132', 'default'),
url_extension = ('h96', 'default'),
url_query_key = ('h37', 'default'),
url_query_value = ('h30', 'default'),
content_none = (sol_base01, 'default'),
content_text = (sol_base1, 'default'),
content_media = (sol_blue, 'default'),
code_200 = (sol_green, 'default'), code_200 = (sol_green, 'default'),
code_300 = (sol_blue, 'default'), code_300 = (sol_blue, 'default'),
code_400 = (sol_orange, 'default',), code_400 = (sol_orange, 'default',),
@ -342,6 +406,7 @@ class SolarizedDark(LowDark):
commander_invalid = (sol_orange, 'default'), commander_invalid = (sol_orange, 'default'),
commander_hint = (sol_base00, 'default'), commander_hint = (sol_base00, 'default'),
) )
gen_rgb_gradient(high, [(15, 0, 0), (15, 15, 0), (0, 15, 0), (0, 15, 15), (0, 0, 15)])
DEFAULT = "dark" DEFAULT = "dark"