addons: dumper spit and polish

- 100% test coverage
- Cleanups
- Add test/mitmproxy/addons/dumperview.py, a small utility for viewing dumper
output variations
This commit is contained in:
Aldo Cortesi 2016-11-02 11:58:27 +13:00
parent a75b3474a4
commit b867fb35a3
4 changed files with 222 additions and 91 deletions

View File

@ -12,12 +12,18 @@ from mitmproxy.utils import human
from mitmproxy.utils import strutils
def indent(n, text):
def indent(n: int, text: str) -> str:
l = str(text).strip().splitlines()
pad = " " * n
return "\n".join(pad + i for i in l)
def colorful(line, styles):
yield u" " # we can already indent here
for (style, text) in line:
yield click.style(text, **styles.get(style, {}))
class Dumper:
def __init__(self):
self.filter = None # type: flowfilter.TFilter
@ -27,14 +33,15 @@ class Dumper:
self.default_contentview = "auto" # type: str
def configure(self, options, updated):
if options.filtstr:
self.filter = flowfilter.parse(options.filtstr)
if not self.filter:
raise exceptions.OptionsError(
"Invalid filter expression: %s" % options.filtstr
)
else:
self.filter = None
if "filtstr" in updated:
if options.filtstr:
self.filter = flowfilter.parse(options.filtstr)
if not self.filter:
raise exceptions.OptionsError(
"Invalid filter expression: %s" % options.filtstr
)
else:
self.filter = None
self.flow_detail = options.flow_detail
self.outfp = options.tfile
self.showhost = options.showhost
@ -47,67 +54,51 @@ class Dumper:
if self.outfp:
self.outfp.flush()
def _echo_message(self, message):
if self.flow_detail >= 2 and hasattr(message, "headers"):
headers = "\r\n".join(
"{}: {}".format(
click.style(
strutils.bytes_to_escaped_str(k), fg="blue", bold=True
),
click.style(
strutils.bytes_to_escaped_str(v), fg="blue"
)
)
for k, v in message.headers.fields
def _echo_headers(self, headers):
for k, v in headers.fields:
k = strutils.bytes_to_escaped_str(k)
v = strutils.bytes_to_escaped_str(v)
out = "{}: {}".format(
click.style(k, fg="blue"),
click.style(v)
)
self.echo(headers, ident=4)
if self.flow_detail >= 3:
_, lines, error = contentviews.get_message_content_view(
self.default_contentview,
message
)
if error:
ctx.log.debug(error)
self.echo(out, ident=4)
styles = dict(
highlight=dict(bold=True),
offset=dict(fg="blue"),
header=dict(fg="green", bold=True),
text=dict(fg="green")
)
def _echo_message(self, message):
_, lines, error = contentviews.get_message_content_view(
self.default_contentview,
message
)
if error:
ctx.log.debug(error)
def colorful(line):
yield u" " # we can already indent here
for (style, text) in line:
yield click.style(text, **styles.get(style, {}))
if self.flow_detail == 3:
lines_to_echo = itertools.islice(lines, 70)
else:
lines_to_echo = lines
if self.flow_detail == 3:
lines_to_echo = itertools.islice(lines, 70)
else:
lines_to_echo = lines
styles = dict(
highlight=dict(bold=True),
offset=dict(fg="blue"),
header=dict(fg="green", bold=True),
text=dict(fg="green")
)
content = u"\r\n".join(
u"".join(colorful(line)) for line in lines_to_echo
)
if content:
self.echo("")
self.echo(content)
content = u"\r\n".join(
u"".join(colorful(line, styles)) for line in lines_to_echo
)
if content:
self.echo("")
self.echo(content)
if next(lines, None):
self.echo("(cut off)", ident=4, dim=True)
if next(lines, None):
self.echo("(cut off)", ident=4, dim=True)
if self.flow_detail >= 2:
self.echo("")
def _echo_request_line(self, flow):
if flow.request.stickycookie:
stickycookie = click.style(
"[stickycookie] ", fg="yellow", bold=True
)
else:
stickycookie = ""
if flow.client_conn:
if flow.client_conn is not None:
client = click.style(
strutils.escape_control_characters(
repr(flow.client_conn.address)
@ -133,8 +124,6 @@ class Dumper:
url = flow.request.pretty_url
else:
url = flow.request.url
if len(url) > 200:
url = url[:199] + ""
url = click.style(strutils.escape_control_characters(url), bold=True)
http_version = ""
@ -142,15 +131,8 @@ class Dumper:
# We hide "normal" HTTP 1.
http_version = " " + flow.request.http_version
if self.flow_detail >= 2:
linebreak = "\n "
else:
linebreak = ""
line = "{client}: {linebreak}{stickycookie}{method} {url}{http_version}".format(
line = "{client}: {method} {url}{http_version}".format(
client=client,
stickycookie=stickycookie,
linebreak=linebreak,
method=method,
url=url,
http_version=http_version
@ -208,11 +190,17 @@ class Dumper:
def echo_flow(self, f):
if f.request:
self._echo_request_line(f)
self._echo_message(f.request)
if self.flow_detail >= 2:
self._echo_headers(f.request.headers)
if self.flow_detail >= 3:
self._echo_message(f.request)
if f.response:
self._echo_response_line(f)
self._echo_message(f.response)
if self.flow_detail >= 2:
self._echo_headers(f.response.headers)
if self.flow_detail >= 3:
self._echo_message(f.response)
if f.error:
msg = strutils.escape_control_characters(f.error.msg)
@ -244,13 +232,12 @@ class Dumper:
)
def tcp_message(self, f):
if not self.match(f):
return
message = f.messages[-1]
direction = "->" if message.from_client else "<-"
self.echo("{client} {direction} tcp {direction} {server}".format(
client=repr(f.client_conn.address),
server=repr(f.server_conn.address),
direction=direction,
))
self._echo_message(message)
if self.match(f):
message = f.messages[-1]
direction = "->" if message.from_client else "<-"
self.echo("{client} {direction} tcp {direction} {server}".format(
client=repr(f.client_conn.address),
server=repr(f.server_conn.address),
direction=direction,
))
self._echo_message(message)

View File

@ -1,4 +1,4 @@
import mitmproxy.test
from mitmproxy.test import tutils
from mitmproxy import tcp
from mitmproxy import controller
from mitmproxy import http
@ -40,9 +40,9 @@ def tflow(client_conn=True, server_conn=True, req=True, resp=None, err=None):
if server_conn is True:
server_conn = tserver_conn()
if req is True:
req = mitmproxy.test.tutils.treq()
req = tutils.treq()
if resp is True:
resp = mitmproxy.test.tutils.tresp()
resp = tutils.tresp()
if err is True:
err = terr()

View File

@ -0,0 +1,57 @@
#!/usr/bin/env python
import click
from mitmproxy.addons import dumper
from mitmproxy.test import tflow
from mitmproxy.test import taddons
from mitmproxy.tools import dump
def show(flow_detail, flows):
d = dumper.Dumper()
with taddons.context(options=dump.Options()) as ctx:
ctx.configure(d, flow_detail=flow_detail)
for f in flows:
ctx.cycle(d, f)
@click.group()
def cli():
pass
@cli.command()
@click.option('--level', default=1, help='Detail level')
def tcp(level):
f1 = tflow.ttcpflow(client_conn=True, server_conn=True)
show(level, [f1])
@cli.command()
@click.option('--level', default=1, help='Detail level')
def large(level):
f1 = tflow.tflow(client_conn=True, server_conn=True, resp=True)
f1.response.headers["content-type"] = "text/html"
f1.response.content = b"foo bar voing\n" * 100
show(level, [f1])
@cli.command()
@click.option('--level', default=1, help='Detail level')
def small(level):
f1 = tflow.tflow(client_conn=True, server_conn=True, resp=True)
f1.response.headers["content-type"] = "text/html"
f1.response.content = b"<html><body>Hello!</body></html>"
f2 = tflow.tflow(client_conn=True, server_conn=True, err=True)
show(
level,
[
f1, f2,
]
)
if __name__ == "__main__":
cli()

View File

@ -1,66 +1,138 @@
import io
from mitmproxy.test import tflow
from mitmproxy.test import taddons
from mitmproxy.test import tutils
from mitmproxy.addons import dumper
from mitmproxy import exceptions
from mitmproxy.tools import dump
from mitmproxy import http
import mitmproxy.test.tutils
import mock
def test_configure():
d = dumper.Dumper()
with taddons.context(options=dump.Options()) as ctx:
ctx.configure(d, filtstr="~b foo")
assert d.filter
f = tflow.tflow(resp=True)
assert not d.match(f)
f.response.content = b"foo"
assert d.match(f)
ctx.configure(d, filtstr=None)
assert not d.filter
tutils.raises(exceptions.OptionsError, ctx.configure, d, filtstr="~~")
assert not d.filter
def test_simple():
d = dumper.Dumper()
with taddons.context(options=dump.Options()) as ctx:
sio = io.StringIO()
ctx.configure(d, tfile = sio, flow_detail = 0)
d.response(tflow.tflow())
d.response(tflow.tflow(resp=True))
assert not sio.getvalue()
sio.truncate(0)
ctx.configure(d, tfile = sio, flow_detail = 1)
d.response(tflow.tflow(resp=True))
assert sio.getvalue()
sio.truncate(0)
ctx.configure(d, tfile = sio, flow_detail = 1)
d.error(tflow.tflow(err=True))
assert sio.getvalue()
sio.truncate(0)
ctx.configure(d, tfile = sio, flow_detail = 4)
d.response(tflow.tflow())
d.response(tflow.tflow(resp=True))
assert sio.getvalue()
sio.truncate(0)
sio = io.StringIO()
ctx.configure(d, tfile = sio, flow_detail = 4)
d.response(tflow.tflow(resp=True))
assert "<<" in sio.getvalue()
sio.truncate(0)
sio = io.StringIO()
ctx.configure(d, tfile = sio, flow_detail = 4)
d.response(tflow.tflow(err=True))
assert "<<" in sio.getvalue()
sio.truncate(0)
sio = io.StringIO()
ctx.configure(d, tfile = sio, flow_detail = 4)
flow = tflow.tflow()
flow.request = mitmproxy.test.tutils.treq()
flow.request = tutils.treq()
flow.request.stickycookie = True
flow.client_conn = mock.MagicMock()
flow.client_conn.address.host = "foo"
flow.response = mitmproxy.test.tutils.tresp(content=None)
flow.response = tutils.tresp(content=None)
flow.response.is_replay = True
flow.response.status_code = 300
d.response(flow)
assert sio.getvalue()
sio.truncate(0)
sio = io.StringIO()
ctx.configure(d, tfile = sio, flow_detail = 4)
flow = tflow.tflow(resp=mitmproxy.test.tutils.tresp(content=b"{"))
flow = tflow.tflow(resp=tutils.tresp(content=b"{"))
flow.response.headers["content-type"] = "application/json"
flow.response.status_code = 400
d.response(flow)
assert sio.getvalue()
sio.truncate(0)
sio = io.StringIO()
ctx.configure(d, tfile = sio, flow_detail = 4)
flow = tflow.tflow()
flow.request.content = None
flow.response = http.HTTPResponse.wrap(mitmproxy.test.tutils.tresp())
flow.response = http.HTTPResponse.wrap(tutils.tresp())
flow.response.content = None
d.response(flow)
assert "content missing" in sio.getvalue()
sio.truncate(0)
def test_echo_body():
f = tflow.tflow(client_conn=True, server_conn=True, resp=True)
f.response.headers["content-type"] = "text/html"
f.response.content = b"foo bar voing\n" * 100
d = dumper.Dumper()
sio = io.StringIO()
with taddons.context(options=dump.Options()) as ctx:
ctx.configure(d, tfile=sio, flow_detail = 3)
d._echo_message(f.response)
t = sio.getvalue()
assert "cut off" in t
def test_echo_request_line():
d = dumper.Dumper()
sio = io.StringIO()
with taddons.context(options=dump.Options()) as ctx:
ctx.configure(d, tfile=sio, flow_detail = 3, showhost = True)
f = tflow.tflow(client_conn=None, server_conn=True, resp=True)
f.request.is_replay = True
d._echo_request_line(f)
assert "[replay]" in sio.getvalue()
sio.truncate(0)
f = tflow.tflow(client_conn=None, server_conn=True, resp=True)
f.request.is_replay = False
d._echo_request_line(f)
assert "[replay]" not in sio.getvalue()
sio.truncate(0)
f = tflow.tflow(client_conn=None, server_conn=True, resp=True)
f.request.http_version = "nonstandard"
d._echo_request_line(f)
assert "nonstandard" in sio.getvalue()
sio.truncate(0)
class TestContentView:
@ -73,3 +145,18 @@ class TestContentView:
ctx.configure(d, flow_detail=4, verbosity=3, tfile=sio)
d.response(tflow.tflow())
assert "Content viewer failed" in ctx.master.event_log[0][1]
def test_tcp():
d = dumper.Dumper()
sio = io.StringIO()
with taddons.context(options=dump.Options()) as ctx:
ctx.configure(d, tfile=sio, flow_detail = 3, showhost = True)
f = tflow.ttcpflow(client_conn=True, server_conn=True)
d.tcp_message(f)
assert "it's me" in sio.getvalue()
sio.truncate(0)
f = tflow.ttcpflow(client_conn=True, err=True)
d.tcp_error(f)
assert "Error in TCP" in sio.getvalue()