mitmdump: colorize output, add content views

This commit is contained in:
Maximilian Hils 2015-09-11 19:03:50 +02:00
parent b7b46bac46
commit 9c31669211
3 changed files with 197 additions and 86 deletions

View File

@ -1,14 +1,16 @@
from __future__ import absolute_import, print_function from __future__ import absolute_import, print_function
import json
import sys import sys
import os import os
import traceback
import click
from netlib.http.semantics import CONTENT_MISSING from netlib.http.semantics import CONTENT_MISSING
import netlib.utils import netlib.utils
from . import flow, filt, contentview
from . import flow, filt, utils from .exceptions import ContentViewException
from .protocol import http from .models import HTTPRequest
class DumpError(Exception): class DumpError(Exception):
pass pass
@ -55,24 +57,6 @@ class Options(object):
setattr(self, i, None) setattr(self, i, None)
def str_response(resp):
r = "%s %s" % (resp.code, resp.msg)
if resp.is_replay:
r = "[replay] " + r
return r
def str_request(f, showhost):
if f.client_conn:
c = f.client_conn.address.host
else:
c = "[replay]"
r = "%s %s %s" % (c, f.request.method, f.request.pretty_url(showhost))
if f.request.stickycookie:
r = "[stickycookie] " + r
return r
class DumpMaster(flow.FlowMaster): class DumpMaster(flow.FlowMaster):
def __init__(self, server, options, outfile=sys.stdout): def __init__(self, server, options, outfile=sys.stdout):
flow.FlowMaster.__init__(self, server, flow.State()) flow.FlowMaster.__init__(self, server, flow.State())
@ -163,72 +147,161 @@ class DumpMaster(flow.FlowMaster):
def add_event(self, e, level="info"): def add_event(self, e, level="info"):
needed = dict(error=0, info=1, debug=2).get(level, 1) needed = dict(error=0, info=1, debug=2).get(level, 1)
if self.o.verbosity >= needed: if self.o.verbosity >= needed:
print(e, file=self.outfile) self.echo(
self.outfile.flush() e,
fg="red" if level == "error" else None,
dim=(level == "debug")
)
@staticmethod @staticmethod
def indent(n, t): def indent(n, text):
l = str(t).strip().splitlines() l = str(text).strip().splitlines()
pad = " " * n pad = " " * n
return "\n".join(pad + i for i in l) return "\n".join(pad + i for i in l)
def _print_message(self, message): def echo(self, text, indent=None, **style):
if indent:
text = self.indent(indent, text)
click.secho(text, file=self.outfile, **style)
def _echo_message(self, message):
if self.o.flow_detail >= 2: if self.o.flow_detail >= 2:
print(self.indent(4, str(message.headers)), file=self.outfile) headers = "\r\n".join(
"{}: {}".format(
click.style(k, fg="blue", bold=True),
click.style(v, fg="blue"))
for k, v in message.headers.fields
)
self.echo(headers, indent=4)
if self.o.flow_detail >= 3: if self.o.flow_detail >= 3:
if message.content == CONTENT_MISSING: if message.content == CONTENT_MISSING:
print(self.indent(4, "(content missing)"), file=self.outfile) self.echo("(content missing)", indent=4)
elif message.content: elif message.content:
print("", file=self.outfile) self.echo("")
content = message.get_decoded_content() cutoff = sys.maxsize if self.o.flow_detail >= 4 else contentview.VIEW_CUTOFF
if not utils.isBin(content): try:
try: type, lines = contentview.get_content_view(
jsn = json.loads(content) contentview.get("Auto"),
print( message.headers,
self.indent( message.body,
4, cutoff,
json.dumps( isinstance(message, HTTPRequest)
jsn, )
indent=2)), except ContentViewException:
file=self.outfile) s = "Content viewer failed: \n" + traceback.format_exc()
except ValueError: self.add_event(s, "debug")
print(self.indent(4, content), file=self.outfile) type, lines = contentview.get_content_view(
else: contentview.get("Raw"),
d = netlib.utils.hexdump(content) message.headers,
d = "\n".join("%s\t%s %s" % i for i in d) message.body,
print(self.indent(4, d), file=self.outfile) cutoff,
isinstance(message, HTTPRequest)
)
styles = dict(
highlight=dict(bold=True),
offset=dict(fg="blue"),
header=dict(fg="green", bold=True),
text=dict(fg="green")
)
def colorful(line):
yield " " # we can already indent here
for (style, text) in line:
yield click.style(text, **styles.get(style, {}))
content = "\r\n".join(
"".join(colorful(line)) for line in lines
)
self.echo(content)
if self.o.flow_detail >= 2: if self.o.flow_detail >= 2:
print("", file=self.outfile) self.echo("")
def _echo_request_line(self, flow):
if flow.request.stickycookie:
stickycookie = click.style("[stickycookie] ", fg="yellow", bold=True)
else:
stickycookie = ""
if flow.client_conn:
client = click.style(flow.client_conn.address.host, bold=True)
else:
client = click.style("[replay]", fg="yellow", bold=True)
method = flow.request.method
method_color=dict(
GET="green",
DELETE="red"
).get(method.upper(), "magenta")
method = click.style(method, fg=method_color, bold=True)
url = click.style(flow.request.pretty_url(self.showhost), bold=True)
line = "{stickycookie}{client} {method} {url}".format(
stickycookie=stickycookie,
client=client,
method=method,
url=url
)
self.echo(line)
def _echo_response_line(self, flow):
if flow.response.is_replay:
replay = click.style("[replay] ", fg="yellow", bold=True)
else:
replay = ""
code = flow.response.status_code
code_color = None
if 200 <= code < 300:
code_color = "green"
elif 300 <= code < 400:
code_color = "magenta"
elif 400 <= code < 600:
code_color = "red"
code = click.style(str(code), fg=code_color, bold=True, blink=(code == 418))
msg = click.style(flow.response.msg, fg=code_color, bold=True)
if flow.response.content == CONTENT_MISSING:
size = "(content missing)"
else:
size = netlib.utils.pretty_size(len(flow.response.content))
size = click.style(size, bold=True)
arrows = click.style("<<", bold=True)
line = "{replay} {arrows} {code} {msg} {size}".format(
replay=replay,
arrows=arrows,
code=code,
msg=msg,
size=size
)
self.echo(line)
def echo_flow(self, f):
if self.o.flow_detail == 0:
return
if f.request:
self._echo_request_line(f)
self._echo_message(f.request)
if f.response:
self._echo_response_line(f)
self._echo_message(f.response)
if f.error:
self.echo(" << {}".format(f.error.msg), bold=True, fg="red")
self.outfile.flush()
def _process_flow(self, f): def _process_flow(self, f):
self.state.delete_flow(f) self.state.delete_flow(f)
if self.filt and not f.match(self.filt): if self.filt and not f.match(self.filt):
return return
if self.o.flow_detail == 0: self.echo_flow(f)
return
if f.request:
print(str_request(f, self.showhost), file=self.outfile)
self._print_message(f.request)
if f.response:
if f.response.content == CONTENT_MISSING:
sz = "(content missing)"
else:
sz = netlib.utils.pretty_size(len(f.response.content))
print(
" << %s %s" %
(str_response(
f.response),
sz),
file=self.outfile)
self._print_message(f.response)
if f.error:
print(" << {}".format(f.error.msg), file=self.outfile)
self.outfile.flush()
def handle_request(self, f): def handle_request(self, f):
flow.FlowMaster.handle_request(self, f) flow.FlowMaster.handle_request(self, f)

View File

@ -23,15 +23,17 @@ deps = {
"html2text>=2015.4.14", "html2text>=2015.4.14",
"construct>=2.5.2", "construct>=2.5.2",
"six>=1.9.0", "six>=1.9.0",
"lxml>=3.3.6",
"Pillow>=2.3.0",
} }
# A script -> additional dependencies dict. # A script -> additional dependencies dict.
scripts = { scripts = {
"mitmproxy": { "mitmproxy": {
"urwid>=1.3", "urwid>=1.3",
"lxml>=3.3.6",
"Pillow>=2.3.0",
}, },
"mitmdump": set(), "mitmdump": {
"click>=5.1",
},
"mitmweb": set() "mitmweb": set()
} }
# Developer dependencies # Developer dependencies

View File

@ -1,5 +1,7 @@
import os import os
from cStringIO import StringIO from cStringIO import StringIO
from libmproxy.contentview import ViewAuto
from libmproxy.exceptions import ContentViewException
from libmproxy.models import HTTPResponse from libmproxy.models import HTTPResponse
import netlib.tutils import netlib.tutils
@ -12,17 +14,51 @@ import mock
def test_strfuncs(): def test_strfuncs():
t = HTTPResponse.wrap(netlib.tutils.tresp()) o = dump.Options()
t.is_replay = True m = dump.DumpMaster(None, o)
dump.str_response(t)
f = tutils.tflow() m.outfile = StringIO()
f.client_conn = None m.o.flow_detail = 0
f.request.stickycookie = True m.echo_flow(tutils.tflow())
assert "stickycookie" in dump.str_request(f, False) assert not m.outfile.getvalue()
assert "stickycookie" in dump.str_request(f, True)
assert "replay" in dump.str_request(f, False) m.o.flow_detail = 4
assert "replay" in dump.str_request(f, True) m.echo_flow(tutils.tflow())
assert m.outfile.getvalue()
m.outfile = StringIO()
m.echo_flow(tutils.tflow(resp=True))
assert "<<" in m.outfile.getvalue()
m.outfile = StringIO()
m.echo_flow(tutils.tflow(err=True))
assert "<<" in m.outfile.getvalue()
flow = tutils.tflow()
flow.request = netlib.tutils.treq()
flow.request.stickycookie = True
flow.client_conn = mock.MagicMock()
flow.client_conn.address.host = "foo"
flow.response = netlib.tutils.tresp(content=CONTENT_MISSING)
flow.response.is_replay = True
flow.response.code = 300
m.echo_flow(flow)
flow = tutils.tflow(resp=netlib.tutils.tresp("{"))
flow.response.headers["content-type"] = "application/json"
flow.response.code = 400
m.echo_flow(flow)
@mock.patch("libmproxy.contentview.get_content_view")
def test_contentview(get_content_view):
get_content_view.side_effect = ContentViewException(""), ("x", [])
o = dump.Options(flow_detail=4, verbosity=3)
m = dump.DumpMaster(None, o, StringIO())
m.echo_flow(tutils.tflow())
assert "Content viewer failed" in m.outfile.getvalue()
class TestDumpMaster: class TestDumpMaster: