mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2025-02-02 00:05:27 +00:00
commit
2774928319
@ -127,15 +127,18 @@ class ClientPlayback:
|
||||
self.q = queue.Queue()
|
||||
self.thread: RequestReplayThread = None
|
||||
|
||||
def check(self, f: http.HTTPFlow):
|
||||
def check(self, f: flow.Flow):
|
||||
if f.live:
|
||||
return "Can't replay live flow."
|
||||
if f.intercepted:
|
||||
return "Can't replay intercepted flow."
|
||||
if not f.request:
|
||||
return "Can't replay flow with missing request."
|
||||
if f.request.raw_content is None:
|
||||
return "Can't replay flow with missing content."
|
||||
if isinstance(f, http.HTTPFlow):
|
||||
if not f.request:
|
||||
return "Can't replay flow with missing request."
|
||||
if f.request.raw_content is None:
|
||||
return "Can't replay flow with missing content."
|
||||
else:
|
||||
return "Can only replay HTTP flows."
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
|
@ -21,7 +21,10 @@ from mitmproxy import command
|
||||
from mitmproxy import connections
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import io
|
||||
from mitmproxy import http # noqa
|
||||
from mitmproxy import http
|
||||
from mitmproxy import tcp
|
||||
from mitmproxy.utils import human
|
||||
|
||||
|
||||
# The underlying sorted list implementation expects the sort key to be stable
|
||||
# for the lifetime of the object. However, if we sort by size, for instance,
|
||||
@ -38,7 +41,7 @@ class _OrderKey:
|
||||
def __init__(self, view):
|
||||
self.view = view
|
||||
|
||||
def generate(self, f: http.HTTPFlow) -> typing.Any: # pragma: no cover
|
||||
def generate(self, f: mitmproxy.flow.Flow) -> typing.Any: # pragma: no cover
|
||||
pass
|
||||
|
||||
def refresh(self, f):
|
||||
@ -68,32 +71,49 @@ class _OrderKey:
|
||||
|
||||
|
||||
class OrderRequestStart(_OrderKey):
|
||||
def generate(self, f: http.HTTPFlow) -> int:
|
||||
return f.request.timestamp_start or 0
|
||||
def generate(self, f: mitmproxy.flow.Flow) -> float:
|
||||
return f.timestamp_start
|
||||
|
||||
|
||||
class OrderRequestMethod(_OrderKey):
|
||||
def generate(self, f: http.HTTPFlow) -> str:
|
||||
return f.request.method
|
||||
def generate(self, f: mitmproxy.flow.Flow) -> str:
|
||||
if isinstance(f, http.HTTPFlow):
|
||||
return f.request.method
|
||||
elif isinstance(f, tcp.TCPFlow):
|
||||
return "TCP"
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class OrderRequestURL(_OrderKey):
|
||||
def generate(self, f: http.HTTPFlow) -> str:
|
||||
return f.request.url
|
||||
def generate(self, f: mitmproxy.flow.Flow) -> str:
|
||||
if isinstance(f, http.HTTPFlow):
|
||||
return f.request.url
|
||||
elif isinstance(f, tcp.TCPFlow):
|
||||
return human.format_address(f.server_conn.address)
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class OrderKeySize(_OrderKey):
|
||||
def generate(self, f: http.HTTPFlow) -> int:
|
||||
s = 0
|
||||
if f.request.raw_content:
|
||||
s += len(f.request.raw_content)
|
||||
if f.response and f.response.raw_content:
|
||||
s += len(f.response.raw_content)
|
||||
return s
|
||||
def generate(self, f: mitmproxy.flow.Flow) -> int:
|
||||
if isinstance(f, http.HTTPFlow):
|
||||
size = 0
|
||||
if f.request.raw_content:
|
||||
size += len(f.request.raw_content)
|
||||
if f.response and f.response.raw_content:
|
||||
size += len(f.response.raw_content)
|
||||
return size
|
||||
elif isinstance(f, tcp.TCPFlow):
|
||||
size = 0
|
||||
for message in f.messages:
|
||||
size += len(message.content)
|
||||
return size
|
||||
else:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
matchall = flowfilter.parse(".")
|
||||
|
||||
matchall = flowfilter.parse("~http | ~tcp")
|
||||
|
||||
orders = [
|
||||
("t", "time"),
|
||||
@ -555,6 +575,18 @@ class View(collections.abc.Sequence):
|
||||
def kill(self, f):
|
||||
self.update([f])
|
||||
|
||||
def tcp_start(self, f):
|
||||
self.add([f])
|
||||
|
||||
def tcp_message(self, f):
|
||||
self.update([f])
|
||||
|
||||
def tcp_error(self, f):
|
||||
self.update([f])
|
||||
|
||||
def tcp_end(self, f):
|
||||
self.update([f])
|
||||
|
||||
def update(self, flows: typing.Sequence[mitmproxy.flow.Flow]) -> None:
|
||||
"""
|
||||
Updates a list of flows. If flow is not in the state, it's ignored.
|
||||
|
@ -180,3 +180,8 @@ class Flow(stateobject.StateObject):
|
||||
if self.reply.state == "taken":
|
||||
self.reply.ack()
|
||||
self.reply.commit()
|
||||
|
||||
@property
|
||||
def timestamp_start(self) -> float:
|
||||
"""Start time of the flow."""
|
||||
return self.client_conn.timestamp_start
|
||||
|
@ -173,6 +173,10 @@ class HTTPFlow(flow.Flow):
|
||||
s += ">"
|
||||
return s.format(flow=self)
|
||||
|
||||
@property
|
||||
def timestamp_start(self) -> float:
|
||||
return self.request.timestamp_start
|
||||
|
||||
def copy(self):
|
||||
f = super().copy()
|
||||
if self.request:
|
||||
|
@ -1,7 +1,6 @@
|
||||
import enum
|
||||
import platform
|
||||
import typing
|
||||
import datetime
|
||||
import time
|
||||
import math
|
||||
from functools import lru_cache
|
||||
from publicsuffix2 import get_sld, get_tld
|
||||
@ -9,7 +8,10 @@ from publicsuffix2 import get_sld, get_tld
|
||||
import urwid
|
||||
import urwid.util
|
||||
|
||||
from mitmproxy import flow
|
||||
from mitmproxy.http import HTTPFlow
|
||||
from mitmproxy.utils import human
|
||||
from mitmproxy.tcp import TCPFlow
|
||||
|
||||
# Detect Windows Subsystem for Linux
|
||||
IS_WSL = "Microsoft" in platform.platform()
|
||||
@ -82,7 +84,7 @@ def format_keyvals(
|
||||
return ret
|
||||
|
||||
|
||||
def fcol(s, attr):
|
||||
def fcol(s: str, attr: str) -> typing.Tuple[str, int, urwid.Text]:
|
||||
s = str(s)
|
||||
return (
|
||||
"fixed",
|
||||
@ -105,20 +107,48 @@ if urwid.util.detected_encoding:
|
||||
else:
|
||||
SYMBOL_REPLAY = u"[r]"
|
||||
SYMBOL_RETURN = u"<-"
|
||||
SYMBOL_MARK = "[m]"
|
||||
SYMBOL_MARK = "#"
|
||||
SYMBOL_UP = "^"
|
||||
SYMBOL_DOWN = " "
|
||||
SYMBOL_ELLIPSIS = "~"
|
||||
|
||||
SCHEME_STYLES = {
|
||||
'http': 'scheme_http',
|
||||
'https': 'scheme_https',
|
||||
'tcp': 'scheme_tcp',
|
||||
}
|
||||
HTTP_REQUEST_METHOD_STYLES = {
|
||||
'GET': 'method_get',
|
||||
'POST': 'method_post',
|
||||
'DELETE': 'method_delete',
|
||||
'HEAD': 'method_head',
|
||||
'PUT': 'method_put'
|
||||
}
|
||||
HTTP_RESPONSE_CODE_STYLE = {
|
||||
2: "code_200",
|
||||
3: "code_300",
|
||||
4: "code_400",
|
||||
5: "code_500",
|
||||
}
|
||||
|
||||
def fixlen(s, maxlen):
|
||||
|
||||
class RenderMode(enum.Enum):
|
||||
TABLE = 1
|
||||
"""The flow list in table format, i.e. one row per flow."""
|
||||
LIST = 2
|
||||
"""The flow list in list format, i.e. potentially multiple rows per flow."""
|
||||
DETAILVIEW = 3
|
||||
"""The top lines in the detail view."""
|
||||
|
||||
|
||||
def fixlen(s: str, maxlen: int) -> str:
|
||||
if len(s) <= maxlen:
|
||||
return s.ljust(maxlen)
|
||||
else:
|
||||
return s[0:maxlen - len(SYMBOL_ELLIPSIS)] + SYMBOL_ELLIPSIS
|
||||
|
||||
|
||||
def fixlen_r(s, maxlen):
|
||||
def fixlen_r(s: str, maxlen: int) -> str:
|
||||
if len(s) <= maxlen:
|
||||
return s.rjust(maxlen)
|
||||
else:
|
||||
@ -233,8 +263,8 @@ def colorize_req(s):
|
||||
for i in range(len(s)):
|
||||
c = s[i]
|
||||
if ((i < i_query and c == '/') or
|
||||
(i < i_query and i > i_last_slash and c == '.') or
|
||||
(i == i_query)):
|
||||
(i < i_query and i > i_last_slash and c == '.') or
|
||||
(i == i_query)):
|
||||
a = 'url_punctuation'
|
||||
elif i > i_query:
|
||||
if in_val:
|
||||
@ -268,294 +298,435 @@ def colorize_url(url):
|
||||
'https:': 'scheme_https',
|
||||
}
|
||||
return [
|
||||
(schemes.get(parts[0], "scheme_other"), len(parts[0]) - 1),
|
||||
('url_punctuation', 3), # ://
|
||||
] + colorize_host(parts[2]) + colorize_req('/' + parts[3])
|
||||
(schemes.get(parts[0], "scheme_other"), len(parts[0]) - 1),
|
||||
('url_punctuation', 3), # ://
|
||||
] + colorize_host(parts[2]) + colorize_req('/' + parts[3])
|
||||
|
||||
|
||||
def format_http_content_type(content_type: str) -> typing.Tuple[str, str]:
|
||||
content_type = content_type.split(";")[0]
|
||||
if content_type.endswith('/javascript'):
|
||||
style = 'content_script'
|
||||
elif content_type.startswith('text/'):
|
||||
style = 'content_text'
|
||||
elif (content_type.startswith('image/') or
|
||||
content_type.startswith('video/') or
|
||||
content_type.startswith('font/') or
|
||||
"/x-font-" in content_type):
|
||||
style = 'content_media'
|
||||
elif content_type.endswith('/json') or content_type.endswith('/xml'):
|
||||
style = 'content_data'
|
||||
elif content_type.startswith('application/'):
|
||||
style = 'content_raw'
|
||||
else:
|
||||
style = 'content_other'
|
||||
return content_type, style
|
||||
|
||||
|
||||
def format_duration(duration: float) -> typing.Tuple[str, str]:
|
||||
pretty_duration = human.pretty_duration(duration)
|
||||
style = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + 1000 * duration) / 12, 0.99))
|
||||
return pretty_duration, style
|
||||
|
||||
|
||||
def format_size(num_bytes: int) -> typing.Tuple[str, str]:
|
||||
pretty_size = human.pretty_size(num_bytes)
|
||||
style = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + num_bytes) / 20, 0.99))
|
||||
return pretty_size, style
|
||||
|
||||
|
||||
def format_left_indicators(
|
||||
*,
|
||||
focused: bool,
|
||||
intercepted: bool,
|
||||
timestamp: float
|
||||
):
|
||||
indicators: typing.List[typing.Union[str, typing.Tuple[str, str]]] = []
|
||||
if focused:
|
||||
indicators.append(("focus", ">>"))
|
||||
else:
|
||||
indicators.append(" ")
|
||||
pretty_timestamp = human.format_timestamp(timestamp)[-8:]
|
||||
if intercepted:
|
||||
indicators.append(("intercept", pretty_timestamp))
|
||||
else:
|
||||
indicators.append(("text", pretty_timestamp))
|
||||
return "fixed", 10, urwid.Text(indicators)
|
||||
|
||||
|
||||
def format_right_indicators(
|
||||
*,
|
||||
replay: bool,
|
||||
marked: bool
|
||||
):
|
||||
indicators: typing.List[typing.Union[str, typing.Tuple[str, str]]] = []
|
||||
if replay:
|
||||
indicators.append(("replay", SYMBOL_REPLAY))
|
||||
else:
|
||||
indicators.append(" ")
|
||||
if marked:
|
||||
indicators.append(("mark", SYMBOL_MARK))
|
||||
else:
|
||||
indicators.append(" ")
|
||||
return "fixed", 2, urwid.Text(indicators)
|
||||
|
||||
|
||||
@lru_cache(maxsize=800)
|
||||
def raw_format_list(f):
|
||||
f = dict(f)
|
||||
pile = []
|
||||
def format_http_flow_list(
|
||||
*,
|
||||
render_mode: RenderMode,
|
||||
focused: bool,
|
||||
marked: bool,
|
||||
request_method: str,
|
||||
request_scheme: str,
|
||||
request_host: str,
|
||||
request_path: str,
|
||||
request_url: str,
|
||||
request_http_version: str,
|
||||
request_timestamp: float,
|
||||
request_is_push_promise: bool,
|
||||
request_is_replay: bool,
|
||||
intercepted: bool,
|
||||
response_code: typing.Optional[int],
|
||||
response_reason: typing.Optional[str],
|
||||
response_content_length: typing.Optional[int],
|
||||
response_content_type: typing.Optional[str],
|
||||
response_is_replay: bool,
|
||||
duration: typing.Optional[float],
|
||||
error_message: typing.Optional[str],
|
||||
) -> urwid.Widget:
|
||||
req = []
|
||||
if f["extended"]:
|
||||
|
||||
if render_mode is RenderMode.DETAILVIEW:
|
||||
req.append(fcol(human.format_timestamp(request_timestamp), "highlight"))
|
||||
else:
|
||||
if focused:
|
||||
req.append(fcol(">>", "focus"))
|
||||
else:
|
||||
req.append(fcol(" ", "focus"))
|
||||
|
||||
method_style = HTTP_REQUEST_METHOD_STYLES.get(request_method, "method_other")
|
||||
req.append(fcol(request_method, method_style))
|
||||
|
||||
if request_is_push_promise:
|
||||
req.append(fcol('PUSH_PROMISE', 'method_http2_push'))
|
||||
|
||||
preamble_len = sum(x[1] for x in req) + len(req) - 1
|
||||
|
||||
if request_http_version not in ("HTTP/1.0", "HTTP/1.1"):
|
||||
request_url += " " + request_http_version
|
||||
if intercepted and not response_code:
|
||||
url_style = "intercept"
|
||||
elif response_code or error_message:
|
||||
url_style = "text"
|
||||
else:
|
||||
url_style = "title"
|
||||
|
||||
if render_mode is RenderMode.DETAILVIEW:
|
||||
req.append(
|
||||
fcol(
|
||||
human.format_timestamp(f["req_timestamp"]),
|
||||
"highlight"
|
||||
)
|
||||
urwid.Text([(url_style, request_url)])
|
||||
)
|
||||
else:
|
||||
req.append(fcol(">>" if f["focus"] else " ", "focus"))
|
||||
req.append(truncated_plain(request_url, url_style))
|
||||
|
||||
if f["marked"]:
|
||||
req.append(fcol(SYMBOL_MARK, "mark"))
|
||||
req.append(format_right_indicators(replay=request_is_replay or response_is_replay, marked=marked))
|
||||
|
||||
if f["req_is_replay"]:
|
||||
req.append(fcol(SYMBOL_REPLAY, "replay"))
|
||||
|
||||
req.append(fcol(f["req_method"], "method"))
|
||||
|
||||
preamble = sum(i[1] for i in req) + len(req) - 1
|
||||
|
||||
if f["intercepted"] and not f["acked"]:
|
||||
uc = "intercept"
|
||||
elif "resp_code" in f or "err_msg" in f:
|
||||
uc = "text"
|
||||
else:
|
||||
uc = "title"
|
||||
|
||||
url = f["req_url"]
|
||||
|
||||
if f["cols"] and len(url) > f["cols"]:
|
||||
url = url[:f["cols"]] + "…"
|
||||
|
||||
if f["req_http_version"] not in ("HTTP/1.0", "HTTP/1.1"):
|
||||
url += " " + f["req_http_version"]
|
||||
req.append(
|
||||
urwid.Text([(uc, url)])
|
||||
)
|
||||
|
||||
pile.append(urwid.Columns(req, dividechars=1))
|
||||
|
||||
resp = []
|
||||
resp.append(
|
||||
("fixed", preamble, urwid.Text(""))
|
||||
)
|
||||
|
||||
if "resp_code" in f:
|
||||
codes = {
|
||||
2: "code_200",
|
||||
3: "code_300",
|
||||
4: "code_400",
|
||||
5: "code_500",
|
||||
}
|
||||
ccol = codes.get(f["resp_code"] // 100, "code_other")
|
||||
resp.append(fcol(SYMBOL_RETURN, ccol))
|
||||
if f["resp_is_replay"]:
|
||||
resp.append(fcol(SYMBOL_REPLAY, "replay"))
|
||||
resp.append(fcol(f["resp_code"], ccol))
|
||||
if f["extended"]:
|
||||
resp.append(fcol(f["resp_reason"], ccol))
|
||||
if f["intercepted"] and f["resp_code"] and not f["acked"]:
|
||||
rc = "intercept"
|
||||
resp = [
|
||||
("fixed", preamble_len, urwid.Text(""))
|
||||
]
|
||||
if response_code:
|
||||
if intercepted:
|
||||
style = "intercept"
|
||||
else:
|
||||
rc = "text"
|
||||
style = ""
|
||||
|
||||
if f["resp_ctype"]:
|
||||
resp.append(fcol(f["resp_ctype"], rc))
|
||||
resp.append(fcol(f["resp_clen"], rc))
|
||||
pretty_duration = human.pretty_duration(f["duration"])
|
||||
resp.append(fcol(pretty_duration, rc))
|
||||
status_style = style or HTTP_RESPONSE_CODE_STYLE.get(response_code // 100, "code_other")
|
||||
resp.append(fcol(SYMBOL_RETURN, status_style))
|
||||
if response_is_replay:
|
||||
resp.append(fcol(SYMBOL_REPLAY, "replay"))
|
||||
resp.append(fcol(str(response_code), status_style))
|
||||
if response_reason and render_mode is RenderMode.DETAILVIEW:
|
||||
resp.append(fcol(response_reason, status_style))
|
||||
|
||||
elif f["err_msg"]:
|
||||
if response_content_type:
|
||||
ct, ct_style = format_http_content_type(response_content_type)
|
||||
resp.append(fcol(ct, style or ct_style))
|
||||
|
||||
if response_content_length:
|
||||
size, size_style = format_size(response_content_length)
|
||||
elif response_content_length == 0:
|
||||
size = "[no content]"
|
||||
size_style = "text"
|
||||
else:
|
||||
size = "[content missing]"
|
||||
size_style = "text"
|
||||
resp.append(fcol(size, style or size_style))
|
||||
|
||||
if duration:
|
||||
dur, dur_style = format_duration(duration)
|
||||
resp.append(fcol(dur, style or dur_style))
|
||||
elif error_message:
|
||||
resp.append(fcol(SYMBOL_RETURN, "error"))
|
||||
resp.append(
|
||||
urwid.Text([
|
||||
(
|
||||
"error",
|
||||
f["err_msg"]
|
||||
)
|
||||
])
|
||||
)
|
||||
pile.append(urwid.Columns(resp, dividechars=1))
|
||||
return urwid.Pile(pile)
|
||||
resp.append(urwid.Text([("error", error_message)]))
|
||||
|
||||
return urwid.Pile([
|
||||
urwid.Columns(req, dividechars=1),
|
||||
urwid.Columns(resp, dividechars=1)
|
||||
])
|
||||
|
||||
|
||||
@lru_cache(maxsize=800)
|
||||
def raw_format_table(f):
|
||||
f = dict(f)
|
||||
pile = []
|
||||
req = []
|
||||
def format_http_flow_table(
|
||||
*,
|
||||
render_mode: RenderMode,
|
||||
focused: bool,
|
||||
marked: bool,
|
||||
request_method: str,
|
||||
request_scheme: str,
|
||||
request_host: str,
|
||||
request_path: str,
|
||||
request_url: str,
|
||||
request_http_version: str,
|
||||
request_timestamp: float,
|
||||
request_is_push_promise: bool,
|
||||
request_is_replay: bool,
|
||||
intercepted: bool,
|
||||
response_code: typing.Optional[int],
|
||||
response_reason: typing.Optional[str],
|
||||
response_content_length: typing.Optional[int],
|
||||
response_content_type: typing.Optional[str],
|
||||
response_is_replay: bool,
|
||||
duration: typing.Optional[float],
|
||||
error_message: typing.Optional[str],
|
||||
) -> urwid.Widget:
|
||||
items = [
|
||||
format_left_indicators(
|
||||
focused=focused,
|
||||
intercepted=intercepted,
|
||||
timestamp=request_timestamp
|
||||
)
|
||||
]
|
||||
|
||||
cursor = [' ', 'focus']
|
||||
if f['focus']:
|
||||
cursor[0] = '>'
|
||||
req.append(fcol(*cursor))
|
||||
|
||||
if f.get('resp_is_replay', False) or f.get('req_is_replay', False):
|
||||
req.append(fcol(SYMBOL_REPLAY, 'replay'))
|
||||
if f['marked']:
|
||||
req.append(fcol(SYMBOL_MARK, 'mark'))
|
||||
|
||||
if f["two_line"]:
|
||||
req.append(TruncatedText(f["req_url"], colorize_url(f["req_url"]), 'left'))
|
||||
pile.append(urwid.Columns(req, dividechars=1))
|
||||
|
||||
req = []
|
||||
req.append(fcol(' ', 'text'))
|
||||
|
||||
if f["intercepted"] and not f["acked"]:
|
||||
uc = "intercept"
|
||||
elif "resp_code" in f or f["err_msg"] is not None:
|
||||
uc = "highlight"
|
||||
if intercepted and not response_code:
|
||||
request_style = "intercept"
|
||||
else:
|
||||
uc = "title"
|
||||
request_style = ""
|
||||
|
||||
if f["extended"]:
|
||||
s = human.format_timestamp(f["req_timestamp"])
|
||||
scheme_style = request_style or SCHEME_STYLES.get(request_scheme, "scheme_other")
|
||||
items.append(fcol(fixlen(request_scheme.upper(), 5), scheme_style))
|
||||
|
||||
if request_is_push_promise:
|
||||
method_style = 'method_http2_push'
|
||||
else:
|
||||
s = datetime.datetime.fromtimestamp(time.mktime(time.localtime(f["req_timestamp"]))).strftime("%H:%M:%S")
|
||||
req.append(fcol(s, uc))
|
||||
method_style = request_style or HTTP_REQUEST_METHOD_STYLES.get(request_method, "method_other")
|
||||
items.append(fcol(fixlen(request_method, 4), method_style))
|
||||
|
||||
methods = {
|
||||
'GET': 'method_get',
|
||||
'POST': 'method_post',
|
||||
'DELETE': 'method_delete',
|
||||
'HEAD': 'method_head',
|
||||
'PUT': 'method_put'
|
||||
}
|
||||
uc = methods.get(f["req_method"], "method_other")
|
||||
if f['extended']:
|
||||
req.append(fcol(f["req_method"], uc))
|
||||
if f["req_promise"]:
|
||||
req.append(fcol('PUSH_PROMISE', 'method_http2_push'))
|
||||
items.append(('weight', 0.25, TruncatedText(request_host, colorize_host(request_host), 'right')))
|
||||
items.append(('weight', 1.0, TruncatedText(request_path, colorize_req(request_path), 'left')))
|
||||
|
||||
if intercepted and response_code:
|
||||
response_style = "intercept"
|
||||
else:
|
||||
if f["req_promise"]:
|
||||
uc = 'method_http2_push'
|
||||
req.append(("fixed", 4, truncated_plain(f["req_method"], uc)))
|
||||
response_style = ""
|
||||
|
||||
if f["two_line"]:
|
||||
req.append(fcol(f["req_http_version"], 'text'))
|
||||
else:
|
||||
schemes = {
|
||||
'http': 'scheme_http',
|
||||
'https': 'scheme_https',
|
||||
}
|
||||
req.append(fcol(fixlen(f["req_scheme"].upper(), 5), schemes.get(f["req_scheme"], "scheme_other")))
|
||||
if response_code:
|
||||
|
||||
req.append(('weight', 0.25, TruncatedText(f["req_host"], colorize_host(f["req_host"]), 'right')))
|
||||
req.append(('weight', 1.0, TruncatedText(f["req_path"], colorize_req(f["req_path"]), 'left')))
|
||||
status = str(response_code)
|
||||
status_style = response_style or HTTP_RESPONSE_CODE_STYLE.get(response_code // 100, "code_other")
|
||||
|
||||
ret = (' ' * len(SYMBOL_RETURN), 'text')
|
||||
status = ('', 'text')
|
||||
content = ('', 'text')
|
||||
size = ('', 'text')
|
||||
duration = ('', 'text')
|
||||
|
||||
if "resp_code" in f:
|
||||
codes = {
|
||||
2: "code_200",
|
||||
3: "code_300",
|
||||
4: "code_400",
|
||||
5: "code_500",
|
||||
}
|
||||
ccol = codes.get(f["resp_code"] // 100, "code_other")
|
||||
ret = (SYMBOL_RETURN, ccol)
|
||||
status = (str(f["resp_code"]), ccol)
|
||||
|
||||
if f["resp_len"] < 0:
|
||||
if f["intercepted"] and f["resp_code"] and not f["acked"]:
|
||||
rc = "intercept"
|
||||
else:
|
||||
rc = "content_none"
|
||||
|
||||
if f["resp_len"] == -1:
|
||||
contentdesc = "[content missing]"
|
||||
else:
|
||||
contentdesc = "[no content]"
|
||||
content = (contentdesc, rc)
|
||||
if response_content_length and response_content_type:
|
||||
content, content_style = format_http_content_type(response_content_type)
|
||||
content_style = response_style or content_style
|
||||
elif response_content_length:
|
||||
content = ''
|
||||
content_style = 'content_none'
|
||||
elif response_content_length == 0:
|
||||
content = "[no content]"
|
||||
content_style = 'content_none'
|
||||
else:
|
||||
if f["resp_ctype"]:
|
||||
ctype = f["resp_ctype"].split(";")[0]
|
||||
if ctype.endswith('/javascript'):
|
||||
rc = 'content_script'
|
||||
elif ctype.startswith('text/'):
|
||||
rc = 'content_text'
|
||||
elif (ctype.startswith('image/') or
|
||||
ctype.startswith('video/') or
|
||||
ctype.startswith('font/') or
|
||||
"/x-font-" in ctype):
|
||||
rc = 'content_media'
|
||||
elif ctype.endswith('/json') or ctype.endswith('/xml'):
|
||||
rc = 'content_data'
|
||||
elif ctype.startswith('application/'):
|
||||
rc = 'content_raw'
|
||||
else:
|
||||
rc = 'content_other'
|
||||
content = (ctype, rc)
|
||||
content = "[content missing]"
|
||||
content_style = 'content_none'
|
||||
|
||||
rc = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + f["resp_len"]) / 20, 0.99))
|
||||
elif error_message:
|
||||
status = 'err'
|
||||
status_style = 'error'
|
||||
content = error_message
|
||||
content_style = 'error'
|
||||
|
||||
size_str = human.pretty_size(f["resp_len"])
|
||||
if not f['extended']:
|
||||
# shorten to 5 chars max
|
||||
if len(size_str) > 5:
|
||||
size_str = size_str[0:4].rstrip('.') + size_str[-1:]
|
||||
size = (size_str, rc)
|
||||
|
||||
if f['duration'] is not None:
|
||||
rc = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + 1000 * f['duration']) / 12, 0.99))
|
||||
duration = (human.pretty_duration(f['duration']), rc)
|
||||
|
||||
elif f["err_msg"]:
|
||||
status = ('Err', 'error')
|
||||
content = f["err_msg"], 'error'
|
||||
|
||||
if f["two_line"]:
|
||||
req.append(fcol(*ret))
|
||||
req.append(fcol(fixlen(status[0], 3), status[1]))
|
||||
req.append(('weight', 0.15, truncated_plain(content[0], content[1], 'right')))
|
||||
if f['extended']:
|
||||
req.append(fcol(*size))
|
||||
else:
|
||||
req.append(fcol(fixlen_r(size[0], 5), size[1]))
|
||||
req.append(fcol(fixlen_r(duration[0], 5), duration[1]))
|
||||
status = ''
|
||||
status_style = 'text'
|
||||
content = ''
|
||||
content_style = ''
|
||||
|
||||
pile.append(urwid.Columns(req, dividechars=1, min_width=15))
|
||||
items.append(fcol(fixlen(status, 3), status_style))
|
||||
items.append(('weight', 0.15, truncated_plain(content, content_style, 'right')))
|
||||
|
||||
return urwid.Pile(pile)
|
||||
if response_content_length:
|
||||
size, size_style = format_size(response_content_length)
|
||||
items.append(fcol(fixlen_r(size, 5), response_style or size_style))
|
||||
else:
|
||||
items.append(("fixed", 5, urwid.Text("")))
|
||||
|
||||
if duration:
|
||||
duration_pretty, duration_style = format_duration(duration)
|
||||
items.append(fcol(fixlen_r(duration_pretty, 5), response_style or duration_style))
|
||||
else:
|
||||
items.append(("fixed", 5, urwid.Text("")))
|
||||
|
||||
items.append(format_right_indicators(
|
||||
replay=request_is_replay or response_is_replay,
|
||||
marked=marked
|
||||
))
|
||||
return urwid.Columns(items, dividechars=1, min_width=15)
|
||||
|
||||
|
||||
def format_flow(f, focus, extended=False, hostheader=False, cols=False, layout='default'):
|
||||
acked = False
|
||||
if f.reply and f.reply.state == "committed":
|
||||
acked = True
|
||||
d = dict(
|
||||
focus=focus,
|
||||
extended=extended,
|
||||
two_line=extended or cols < 100,
|
||||
cols=cols,
|
||||
intercepted=f.intercepted,
|
||||
acked=acked,
|
||||
req_timestamp=f.request.timestamp_start,
|
||||
req_is_replay=f.request.is_replay,
|
||||
req_method=f.request.method,
|
||||
req_promise='h2-pushed-stream' in f.metadata,
|
||||
req_url=f.request.pretty_url if hostheader else f.request.url,
|
||||
req_scheme=f.request.scheme,
|
||||
req_host=f.request.pretty_host if hostheader else f.request.host,
|
||||
req_path=f.request.path,
|
||||
req_http_version=f.request.http_version,
|
||||
err_msg=f.error.msg if f.error else None,
|
||||
marked=f.marked,
|
||||
)
|
||||
if f.response:
|
||||
if f.response.raw_content:
|
||||
content_len = len(f.response.raw_content)
|
||||
contentdesc = human.pretty_size(len(f.response.raw_content))
|
||||
elif f.response.raw_content is None:
|
||||
content_len = -1
|
||||
contentdesc = "[content missing]"
|
||||
@lru_cache(maxsize=800)
|
||||
def format_tcp_flow(
|
||||
*,
|
||||
render_mode: RenderMode,
|
||||
focused: bool,
|
||||
timestamp_start: float,
|
||||
marked: bool,
|
||||
client_address,
|
||||
server_address,
|
||||
total_size: int,
|
||||
duration: typing.Optional[float],
|
||||
error_message: typing.Optional[str],
|
||||
):
|
||||
conn = f"{human.format_address(client_address)} <-> {human.format_address(server_address)}"
|
||||
|
||||
items = []
|
||||
|
||||
if render_mode in (RenderMode.TABLE, RenderMode.DETAILVIEW):
|
||||
items.append(
|
||||
format_left_indicators(focused=focused, intercepted=False, timestamp=timestamp_start)
|
||||
)
|
||||
else:
|
||||
if focused:
|
||||
items.append(fcol(">>", "focus"))
|
||||
else:
|
||||
content_len = -2
|
||||
contentdesc = "[no content]"
|
||||
items.append(fcol(" ", "focus"))
|
||||
|
||||
duration = None
|
||||
if f.response.timestamp_end and f.request.timestamp_start:
|
||||
duration = max([f.response.timestamp_end - f.request.timestamp_start, 0])
|
||||
if render_mode is RenderMode.TABLE:
|
||||
items.append(fcol("TCP ", SCHEME_STYLES["tcp"]))
|
||||
else:
|
||||
items.append(fcol("TCP", SCHEME_STYLES["tcp"]))
|
||||
|
||||
d.update(dict(
|
||||
resp_code=f.response.status_code,
|
||||
resp_reason=f.response.reason,
|
||||
resp_is_replay=f.response.is_replay,
|
||||
resp_len=content_len,
|
||||
resp_ctype=f.response.headers.get("content-type"),
|
||||
resp_clen=contentdesc,
|
||||
items.append(('weight', 1.0, truncated_plain(conn, "text", 'left')))
|
||||
if error_message:
|
||||
items.append(('weight', 1.0, truncated_plain(error_message, "error", 'left')))
|
||||
|
||||
if total_size:
|
||||
size, size_style = format_size(total_size)
|
||||
items.append(fcol(fixlen_r(size, 5), size_style))
|
||||
else:
|
||||
items.append(("fixed", 5, urwid.Text("")))
|
||||
|
||||
if duration:
|
||||
duration_pretty, duration_style = format_duration(duration)
|
||||
items.append(fcol(fixlen_r(duration_pretty, 5), duration_style))
|
||||
else:
|
||||
items.append(("fixed", 5, urwid.Text("")))
|
||||
|
||||
items.append(format_right_indicators(replay=False, marked=marked))
|
||||
|
||||
return urwid.Pile([
|
||||
urwid.Columns(items, dividechars=1, min_width=15)
|
||||
])
|
||||
|
||||
|
||||
def format_flow(
|
||||
f: flow.Flow,
|
||||
*,
|
||||
render_mode: RenderMode,
|
||||
hostheader: bool = False, # pass options directly if we need more stuff from them
|
||||
focused: bool = True,
|
||||
) -> urwid.Widget:
|
||||
"""
|
||||
This functions calls the proper renderer depending on the flow type.
|
||||
We also want to cache the renderer output, so we extract all attributes
|
||||
relevant for display and call the render with only that. This assures that rows
|
||||
are updated if the flow is changed.
|
||||
"""
|
||||
duration: typing.Optional[float]
|
||||
error_message: typing.Optional[str]
|
||||
if f.error:
|
||||
error_message = f.error.msg
|
||||
else:
|
||||
error_message = None
|
||||
|
||||
if isinstance(f, TCPFlow):
|
||||
total_size = 0
|
||||
for message in f.messages:
|
||||
total_size += len(message.content)
|
||||
if f.messages:
|
||||
duration = f.messages[-1].timestamp - f.timestamp_start
|
||||
else:
|
||||
duration = None
|
||||
return format_tcp_flow(
|
||||
render_mode=render_mode,
|
||||
focused=focused,
|
||||
timestamp_start=f.timestamp_start,
|
||||
marked=f.marked,
|
||||
client_address=f.client_conn.address,
|
||||
server_address=f.server_conn.address,
|
||||
total_size=total_size,
|
||||
duration=duration,
|
||||
))
|
||||
error_message=error_message,
|
||||
)
|
||||
elif isinstance(f, HTTPFlow):
|
||||
intercepted = (
|
||||
f.intercepted and not (f.reply and f.reply.state == "committed")
|
||||
)
|
||||
response_content_length: typing.Optional[int]
|
||||
if f.response:
|
||||
if f.response.raw_content is not None:
|
||||
response_content_length = len(f.response.raw_content)
|
||||
else:
|
||||
response_content_length = None
|
||||
response_code = f.response.status_code
|
||||
response_reason = f.response.reason
|
||||
response_content_type = f.response.headers.get("content-type")
|
||||
response_is_replay = f.response.is_replay
|
||||
if f.response.timestamp_end:
|
||||
duration = max([f.response.timestamp_end - f.request.timestamp_start, 0])
|
||||
else:
|
||||
duration = None
|
||||
else:
|
||||
response_content_length = None
|
||||
response_code = None
|
||||
response_reason = None
|
||||
response_content_type = None
|
||||
response_is_replay = False
|
||||
duration = None
|
||||
|
||||
if render_mode in (RenderMode.LIST, RenderMode.DETAILVIEW):
|
||||
render_func = format_http_flow_list
|
||||
else:
|
||||
render_func = format_http_flow_table
|
||||
return render_func(
|
||||
render_mode=render_mode,
|
||||
focused=focused,
|
||||
marked=f.marked,
|
||||
request_method=f.request.method,
|
||||
request_scheme=f.request.scheme,
|
||||
request_host=f.request.pretty_host if hostheader else f.request.host,
|
||||
request_path=f.request.path,
|
||||
request_url=f.request.pretty_url if hostheader else f.request.url,
|
||||
request_http_version=f.request.http_version,
|
||||
request_timestamp=f.request.timestamp_start,
|
||||
request_is_push_promise='h2-pushed-stream' in f.metadata,
|
||||
request_is_replay=f.request.is_replay,
|
||||
intercepted=intercepted,
|
||||
response_code=response_code,
|
||||
response_reason=response_reason,
|
||||
response_content_length=response_content_length,
|
||||
response_content_type=response_content_type,
|
||||
response_is_replay=response_is_replay,
|
||||
duration=duration,
|
||||
error_message=error_message,
|
||||
)
|
||||
|
||||
if ((layout == 'default' and cols < 100) or layout == "list"):
|
||||
return raw_format_list(tuple(sorted(d.items())))
|
||||
else:
|
||||
return raw_format_table(tuple(sorted(d.items())))
|
||||
raise NotImplementedError()
|
||||
|
@ -9,6 +9,7 @@ from mitmproxy import exceptions
|
||||
from mitmproxy import flow
|
||||
from mitmproxy import http
|
||||
from mitmproxy import log
|
||||
from mitmproxy import tcp
|
||||
from mitmproxy.tools.console import keymap
|
||||
from mitmproxy.tools.console import overlay
|
||||
from mitmproxy.tools.console import signals
|
||||
@ -112,7 +113,7 @@ class ConsoleAddon:
|
||||
choices=sorted(console_palettes),
|
||||
)
|
||||
loader.add_option(
|
||||
"console_palette_transparent", bool, False,
|
||||
"console_palette_transparent", bool, True,
|
||||
"Set transparent background for palette."
|
||||
)
|
||||
loader.add_option(
|
||||
@ -334,9 +335,10 @@ class ConsoleAddon:
|
||||
@command.command("console.view.flow")
|
||||
def view_flow(self, flow: flow.Flow) -> None:
|
||||
"""View a flow."""
|
||||
if hasattr(flow, "request"):
|
||||
# FIME: Also set focus?
|
||||
if isinstance(flow, (http.HTTPFlow, tcp.TCPFlow)):
|
||||
self.master.switch_view("flowview")
|
||||
else:
|
||||
ctx.log.warn(f"No detail view for {type(flow).__name__}.")
|
||||
|
||||
@command.command("console.exit")
|
||||
def exit(self) -> None:
|
||||
|
@ -1,5 +1,7 @@
|
||||
import typing
|
||||
import urwid
|
||||
|
||||
import mitmproxy.flow
|
||||
from mitmproxy import http
|
||||
from mitmproxy.tools.console import common, searchable
|
||||
from mitmproxy.utils import human
|
||||
@ -13,13 +15,19 @@ def maybe_timestamp(base, attr):
|
||||
return "active"
|
||||
|
||||
|
||||
def flowdetails(state, flow: http.HTTPFlow):
|
||||
def flowdetails(state, flow: mitmproxy.flow.Flow):
|
||||
text = []
|
||||
|
||||
sc = flow.server_conn
|
||||
cc = flow.client_conn
|
||||
req = flow.request
|
||||
resp = flow.response
|
||||
req: typing.Optional[http.HTTPRequest]
|
||||
resp: typing.Optional[http.HTTPResponse]
|
||||
if isinstance(flow, http.HTTPFlow):
|
||||
req = flow.request
|
||||
resp = flow.response
|
||||
else:
|
||||
req = None
|
||||
resp = None
|
||||
metadata = flow.metadata
|
||||
|
||||
if metadata is not None and len(metadata) > 0:
|
||||
@ -126,6 +134,12 @@ def flowdetails(state, flow: http.HTTPFlow):
|
||||
maybe_timestamp(cc, "timestamp_tls_setup")
|
||||
)
|
||||
)
|
||||
parts.append(
|
||||
(
|
||||
"Client conn. closed",
|
||||
maybe_timestamp(cc, "timestamp_end")
|
||||
)
|
||||
)
|
||||
|
||||
if sc is not None and sc.timestamp_start:
|
||||
parts.append(
|
||||
@ -147,6 +161,12 @@ def flowdetails(state, flow: http.HTTPFlow):
|
||||
maybe_timestamp(sc, "timestamp_tls_setup")
|
||||
)
|
||||
)
|
||||
parts.append(
|
||||
(
|
||||
"Server conn. closed",
|
||||
maybe_timestamp(sc, "timestamp_end")
|
||||
)
|
||||
)
|
||||
|
||||
if req is not None and req.timestamp_start:
|
||||
parts.append(
|
||||
|
@ -14,12 +14,17 @@ class FlowItem(urwid.WidgetWrap):
|
||||
|
||||
def get_text(self):
|
||||
cols, _ = self.master.ui.get_cols_rows()
|
||||
layout = self.master.options.console_flowlist_layout
|
||||
if layout == "list" or (layout == 'default' and cols < 100):
|
||||
render_mode = common.RenderMode.LIST
|
||||
else:
|
||||
render_mode = common.RenderMode.TABLE
|
||||
|
||||
return common.format_flow(
|
||||
self.flow,
|
||||
self.flow is self.master.view.focus.flow,
|
||||
render_mode=render_mode,
|
||||
focused=self.flow is self.master.view.focus.flow,
|
||||
hostheader=self.master.options.showhost,
|
||||
cols=cols,
|
||||
layout=self.master.options.console_flowlist_layout
|
||||
)
|
||||
|
||||
def selectable(self):
|
||||
@ -27,9 +32,8 @@ class FlowItem(urwid.WidgetWrap):
|
||||
|
||||
def mouse_event(self, size, event, button, col, row, focus):
|
||||
if event == "mouse press" and button == 1:
|
||||
if self.flow.request:
|
||||
self.master.commands.execute("console.view.flow @focus")
|
||||
return True
|
||||
self.master.commands.execute("console.view.flow @focus")
|
||||
return True
|
||||
|
||||
def keypress(self, size, key):
|
||||
return key
|
||||
|
@ -5,9 +5,11 @@ from typing import Optional, Union # noqa
|
||||
|
||||
import urwid
|
||||
|
||||
import mitmproxy.flow
|
||||
from mitmproxy import contentviews
|
||||
from mitmproxy import ctx
|
||||
from mitmproxy import http
|
||||
from mitmproxy import tcp
|
||||
from mitmproxy.tools.console import common
|
||||
from mitmproxy.tools.console import layoutwidget
|
||||
from mitmproxy.tools.console import flowdetailview
|
||||
@ -24,8 +26,8 @@ class SearchError(Exception):
|
||||
class FlowViewHeader(urwid.WidgetWrap):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
master: "mitmproxy.tools.console.master.ConsoleMaster",
|
||||
self,
|
||||
master: "mitmproxy.tools.console.master.ConsoleMaster",
|
||||
) -> None:
|
||||
self.master = master
|
||||
self.focus_changed()
|
||||
@ -35,11 +37,8 @@ class FlowViewHeader(urwid.WidgetWrap):
|
||||
if self.master.view.focus.flow:
|
||||
self._w = common.format_flow(
|
||||
self.master.view.focus.flow,
|
||||
False,
|
||||
extended=True,
|
||||
render_mode=common.RenderMode.DETAILVIEW,
|
||||
hostheader=self.master.options.showhost,
|
||||
cols=cols,
|
||||
layout=self.master.options.console_flowlist_layout
|
||||
)
|
||||
else:
|
||||
self._w = urwid.Pile([])
|
||||
@ -52,45 +51,90 @@ class FlowDetails(tabs.Tabs):
|
||||
self.show()
|
||||
self.last_displayed_body = None
|
||||
|
||||
def focus_changed(self):
|
||||
if self.master.view.focus.flow:
|
||||
self.tabs = [
|
||||
(self.tab_request, self.view_request),
|
||||
(self.tab_response, self.view_response),
|
||||
(self.tab_details, self.view_details),
|
||||
]
|
||||
self.show()
|
||||
else:
|
||||
self.master.window.pop()
|
||||
|
||||
@property
|
||||
def view(self):
|
||||
return self.master.view
|
||||
|
||||
@property
|
||||
def flow(self):
|
||||
def flow(self) -> mitmproxy.flow.Flow:
|
||||
return self.master.view.focus.flow
|
||||
|
||||
def tab_request(self):
|
||||
if self.flow.intercepted and not self.flow.response:
|
||||
def focus_changed(self):
|
||||
if self.flow:
|
||||
if isinstance(self.flow, http.HTTPFlow):
|
||||
self.tabs = [
|
||||
(self.tab_http_request, self.view_request),
|
||||
(self.tab_http_response, self.view_response),
|
||||
(self.tab_details, self.view_details),
|
||||
]
|
||||
elif isinstance(self.flow, tcp.TCPFlow):
|
||||
self.tabs = [
|
||||
(self.tab_tcp_stream, self.view_tcp_stream),
|
||||
(self.tab_details, self.view_details),
|
||||
]
|
||||
self.show()
|
||||
else:
|
||||
self.master.window.pop()
|
||||
|
||||
def tab_http_request(self):
|
||||
flow = self.flow
|
||||
assert isinstance(flow, http.HTTPFlow)
|
||||
if self.flow.intercepted and not flow.response:
|
||||
return "Request intercepted"
|
||||
else:
|
||||
return "Request"
|
||||
|
||||
def tab_response(self):
|
||||
if self.flow.intercepted and self.flow.response:
|
||||
def tab_http_response(self):
|
||||
flow = self.flow
|
||||
assert isinstance(flow, http.HTTPFlow)
|
||||
if self.flow.intercepted and flow.response:
|
||||
return "Response intercepted"
|
||||
else:
|
||||
return "Response"
|
||||
|
||||
def tab_tcp_stream(self):
|
||||
return "TCP Stream"
|
||||
|
||||
def tab_details(self):
|
||||
return "Detail"
|
||||
|
||||
def view_request(self):
|
||||
return self.conn_text(self.flow.request)
|
||||
flow = self.flow
|
||||
assert isinstance(flow, http.HTTPFlow)
|
||||
return self.conn_text(flow.request)
|
||||
|
||||
def view_response(self):
|
||||
return self.conn_text(self.flow.response)
|
||||
flow = self.flow
|
||||
assert isinstance(flow, http.HTTPFlow)
|
||||
return self.conn_text(flow.response)
|
||||
|
||||
def view_tcp_stream(self) -> urwid.Widget:
|
||||
flow = self.flow
|
||||
assert isinstance(flow, tcp.TCPFlow)
|
||||
|
||||
if not flow.messages:
|
||||
return searchable.Searchable([urwid.Text(("highlight", "No messages."))])
|
||||
|
||||
from_client = None
|
||||
messages = []
|
||||
for message in flow.messages:
|
||||
if message.from_client is not from_client:
|
||||
messages.append(message.content)
|
||||
from_client = message.from_client
|
||||
else:
|
||||
messages[-1] += message.content
|
||||
|
||||
from_client = flow.messages[0].from_client
|
||||
parts = []
|
||||
for message in messages:
|
||||
parts.append(
|
||||
(
|
||||
"head" if from_client else "key",
|
||||
message
|
||||
)
|
||||
)
|
||||
from_client = not from_client
|
||||
return searchable.Searchable([urwid.Text(parts)])
|
||||
|
||||
def view_details(self):
|
||||
return flowdetailview.flowdetails(self.view, self.flow)
|
||||
@ -229,7 +273,7 @@ class FlowView(urwid.Frame, layoutwidget.LayoutWidget):
|
||||
def __init__(self, master):
|
||||
super().__init__(
|
||||
FlowDetails(master),
|
||||
header = FlowViewHeader(master),
|
||||
header=FlowViewHeader(master),
|
||||
)
|
||||
self.master = master
|
||||
|
||||
|
@ -22,9 +22,8 @@ class Palette:
|
||||
'option_selected_key',
|
||||
|
||||
# List and Connections
|
||||
'method',
|
||||
'method_get', 'method_post', 'method_delete', 'method_other', 'method_head', 'method_put', 'method_http2_push',
|
||||
'scheme_http', 'scheme_https', 'scheme_other',
|
||||
'scheme_http', 'scheme_https', 'scheme_tcp', 'scheme_other',
|
||||
'url_punctuation', 'url_domain', 'url_filename', 'url_extension', 'url_query_key', 'url_query_value',
|
||||
'content_none', 'content_text', 'content_script', 'content_media', 'content_data', 'content_raw', 'content_other',
|
||||
'focus',
|
||||
@ -121,7 +120,6 @@ class LowDark(Palette):
|
||||
option_active_selected = ('light red', 'light gray'),
|
||||
|
||||
# List and Connections
|
||||
method = ('dark cyan', 'default'),
|
||||
method_get = ('light green', 'default'),
|
||||
method_post = ('brown', 'default'),
|
||||
method_delete = ('light red', 'default'),
|
||||
@ -132,6 +130,7 @@ class LowDark(Palette):
|
||||
|
||||
scheme_http = ('dark cyan', 'default'),
|
||||
scheme_https = ('dark green', 'default'),
|
||||
scheme_tcp=('dark magenta', 'default'),
|
||||
scheme_other = ('dark magenta', 'default'),
|
||||
|
||||
url_punctuation = ('light gray', 'default'),
|
||||
@ -221,7 +220,6 @@ class LowLight(Palette):
|
||||
option_active_selected = ('light red', 'light gray'),
|
||||
|
||||
# List and Connections
|
||||
method = ('dark cyan', 'default'),
|
||||
method_get = ('dark green', 'default'),
|
||||
method_post = ('brown', 'default'),
|
||||
method_head = ('dark cyan', 'default'),
|
||||
@ -232,6 +230,7 @@ class LowLight(Palette):
|
||||
|
||||
scheme_http = ('dark cyan', 'default'),
|
||||
scheme_https = ('light green', 'default'),
|
||||
scheme_tcp=('light magenta', 'default'),
|
||||
scheme_other = ('light magenta', 'default'),
|
||||
|
||||
url_punctuation = ('dark gray', 'default'),
|
||||
@ -340,7 +339,6 @@ class SolarizedLight(LowLight):
|
||||
|
||||
# List and Connections
|
||||
|
||||
method = ('dark cyan', 'default'),
|
||||
method_get = (sol_green, 'default'),
|
||||
method_post = (sol_orange, 'default'),
|
||||
method_head = (sol_cyan, 'default'),
|
||||
@ -351,6 +349,7 @@ class SolarizedLight(LowLight):
|
||||
|
||||
scheme_http = (sol_cyan, 'default'),
|
||||
scheme_https = ('light green', 'default'),
|
||||
scheme_tcp=('light magenta', 'default'),
|
||||
scheme_other = ('light magenta', 'default'),
|
||||
|
||||
url_punctuation = ('dark gray', 'default'),
|
||||
@ -416,7 +415,6 @@ class SolarizedDark(LowDark):
|
||||
# List and Connections
|
||||
focus = (sol_base1, 'default'),
|
||||
|
||||
method = (sol_cyan, 'default'),
|
||||
method_get = (sol_green, 'default'),
|
||||
method_post = (sol_orange, 'default'),
|
||||
method_delete = (sol_red, 'default'),
|
||||
|
@ -144,6 +144,9 @@ class TestClientPlayback:
|
||||
f.request.raw_content = None
|
||||
assert "missing content" in cp.check(f)
|
||||
|
||||
f = tflow.ttcpflow()
|
||||
assert "Can only replay HTTP" in cp.check(f)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_playback(self):
|
||||
cp = clientplayback.ClientPlayback()
|
||||
|
@ -36,7 +36,7 @@ def test_order_refresh():
|
||||
assert sargs
|
||||
|
||||
|
||||
def test_order_generators():
|
||||
def test_order_generators_http():
|
||||
v = view.View()
|
||||
tf = tflow.tflow(resp=True)
|
||||
|
||||
@ -53,6 +53,23 @@ def test_order_generators():
|
||||
assert sz.generate(tf) == len(tf.request.raw_content) + len(tf.response.raw_content)
|
||||
|
||||
|
||||
def test_order_generators_tcp():
|
||||
v = view.View()
|
||||
tf = tflow.ttcpflow()
|
||||
|
||||
rs = view.OrderRequestStart(v)
|
||||
assert rs.generate(tf) == 946681200
|
||||
|
||||
rm = view.OrderRequestMethod(v)
|
||||
assert rm.generate(tf) == "TCP"
|
||||
|
||||
ru = view.OrderRequestURL(v)
|
||||
assert ru.generate(tf) == "address:22"
|
||||
|
||||
sz = view.OrderKeySize(v)
|
||||
assert sz.generate(tf) == sum(len(m.content) for m in tf.messages)
|
||||
|
||||
|
||||
def test_simple():
|
||||
v = view.View()
|
||||
f = tft(start=1)
|
||||
@ -105,6 +122,21 @@ def test_simple():
|
||||
assert len(v._store) == 0
|
||||
|
||||
|
||||
def test_simple_tcp():
|
||||
v = view.View()
|
||||
f = tflow.ttcpflow()
|
||||
assert v.store_count() == 0
|
||||
v.tcp_start(f)
|
||||
assert list(v) == [f]
|
||||
|
||||
# These all just call update
|
||||
v.tcp_start(f)
|
||||
v.tcp_message(f)
|
||||
v.tcp_error(f)
|
||||
v.tcp_end(f)
|
||||
assert list(v) == [f]
|
||||
|
||||
|
||||
def test_filter():
|
||||
v = view.View()
|
||||
v.request(tft(method="get"))
|
||||
|
@ -254,6 +254,10 @@ class TestHTTPFlow:
|
||||
f.response.decode()
|
||||
assert f.response.raw_content == b"abarb"
|
||||
|
||||
def test_timestamp_start(self):
|
||||
f = tflow.tflow()
|
||||
assert f.timestamp_start == f.request.timestamp_start
|
||||
|
||||
|
||||
def test_make_error_response():
|
||||
resp = http.make_error_response(543, 'foobar', Headers())
|
||||
|
@ -5,10 +5,16 @@ from mitmproxy.tools.console import common
|
||||
|
||||
|
||||
def test_format_flow():
|
||||
f = tflow.tflow(resp=True)
|
||||
assert common.format_flow(f, True)
|
||||
assert common.format_flow(f, True, hostheader=True)
|
||||
assert common.format_flow(f, True, extended=True)
|
||||
flows = [
|
||||
tflow.tflow(resp=True),
|
||||
tflow.tflow(err=True),
|
||||
tflow.ttcpflow(),
|
||||
tflow.ttcpflow(err=True),
|
||||
]
|
||||
for f in flows:
|
||||
for render_mode in common.RenderMode:
|
||||
assert common.format_flow(f, render_mode=render_mode)
|
||||
assert common.format_flow(f, render_mode=render_mode, hostheader=True, focused=False)
|
||||
|
||||
|
||||
def test_format_keyvals():
|
||||
@ -26,7 +32,7 @@ def test_format_keyvals():
|
||||
)
|
||||
), 1
|
||||
)
|
||||
assert wrapped.render((30, ))
|
||||
assert wrapped.render((30,))
|
||||
assert common.format_keyvals(
|
||||
[
|
||||
("aa", wrapped)
|
||||
|
Loading…
Reference in New Issue
Block a user