Merge pull request #765 from mitmproxy/such-colors-very-wow

Improve Content Views
This commit is contained in:
Maximilian Hils 2015-09-12 17:57:21 +02:00
commit 32b487109f
11 changed files with 437 additions and 298 deletions

View File

@ -14,7 +14,7 @@ import traceback
import urwid
import weakref
from .. import controller, flow, script, contentview
from .. import controller, flow, script, contentviews
from . import flowlist, flowview, help, window, signals, options
from . import grideditor, palettes, statusbar, palettepicker
@ -26,7 +26,7 @@ class ConsoleState(flow.State):
flow.State.__init__(self)
self.focus = None
self.follow_focus = None
self.default_body_view = contentview.get("Auto")
self.default_body_view = contentviews.get("Auto")
self.flowsettings = weakref.WeakKeyDictionary()
self.last_search = None
@ -648,7 +648,7 @@ class ConsoleMaster(flow.FlowMaster):
return self.state.set_intercept(txt)
def change_default_display_mode(self, t):
v = contentview.get_by_shortcut(t)
v = contentviews.get_by_shortcut(t)
self.state.default_body_view = v
self.refresh_focus()

View File

@ -1,15 +1,15 @@
from __future__ import absolute_import
import os
import sys
import traceback
import sys
import urwid
from netlib import odict
from netlib.http.semantics import CONTENT_MISSING, Headers
from . import common, grideditor, signals, searchable, tabs
from . import flowdetailview
from .. import utils, controller, contentview
from .. import utils, controller, contentviews
from ..models import HTTPRequest, HTTPResponse, decoded
from ..exceptions import ContentViewException
@ -167,10 +167,10 @@ class FlowView(tabs.Tabs):
if flow == self.flow:
self.show()
def content_view(self, viewmode, conn):
if conn.content == CONTENT_MISSING:
def content_view(self, viewmode, message):
if message.body == CONTENT_MISSING:
msg, body = "", [urwid.Text([("error", "[content missing]")])]
return (msg, body)
return msg, body
else:
full = self.state.get_flow_setting(
self.flow,
@ -180,29 +180,43 @@ class FlowView(tabs.Tabs):
if full:
limit = sys.maxsize
else:
limit = contentview.VIEW_CUTOFF
limit = contentviews.VIEW_CUTOFF
return cache.get(
self._get_content_view,
viewmode,
conn.headers,
conn.content,
limit,
isinstance(conn, HTTPRequest)
message,
limit
)
def _get_content_view(self, viewmode, headers, content, limit, is_request):
def _get_content_view(self, viewmode, message, max_lines):
try:
description, lines = contentview.get_content_view(
viewmode, headers, content, limit, is_request
description, lines = contentviews.get_content_view(
viewmode, message.body, headers=message.headers
)
except ContentViewException:
s = "Content viewer failed: \n" + traceback.format_exc()
signals.add_event(s, "error")
description, lines = contentview.get_content_view(
contentview.get("Raw"), headers, content, limit, is_request
description, lines = contentviews.get_content_view(
contentviews.get("Raw"), message.body, headers=message.headers
)
description = description.replace("Raw", "Couldn't parse: falling back to Raw")
text_objects = [urwid.Text(l) for l in lines]
# Give hint that you have to tab for the response.
if description == "No content" and isinstance(message, HTTPRequest):
description = "No request content (press tab to view response)"
text_objects = []
for line in lines:
text_objects.append(urwid.Text(line))
if len(text_objects) == max_lines:
text_objects.append(urwid.Text([
("highlight", "Stopped displaying data after %d lines. Press " % max_lines),
("key", "f"),
("highlight", " to load all data.")
]))
break
return description, text_objects
def viewmode_get(self):
@ -227,9 +241,7 @@ class FlowView(tabs.Tabs):
[
("heading", msg),
]
)
]
cols.append(
),
urwid.Text(
[
" ",
@ -239,7 +251,7 @@ class FlowView(tabs.Tabs):
],
align="right"
)
)
]
title = urwid.AttrWrap(urwid.Columns(cols), "heading")
txt.append(title)
@ -471,7 +483,7 @@ class FlowView(tabs.Tabs):
self.state.add_flow_setting(
self.flow,
(self.tab_offset, "prettyview"),
contentview.get_by_shortcut(t)
contentviews.get_by_shortcut(t)
)
signals.flow_change.send(self, flow = self.flow)
@ -611,7 +623,7 @@ class FlowView(tabs.Tabs):
scope = "s"
common.ask_copy_part(scope, self.flow, self.master, self.state)
elif key == "m":
p = list(contentview.view_prompts)
p = list(contentviews.view_prompts)
p.insert(0, ("Clear", "C"))
signals.status_prompt_onekey.send(
self,

View File

@ -1,6 +1,6 @@
import urwid
from .. import contentview
from .. import contentviews
from . import common, signals, grideditor
from . import select, palettes
@ -158,7 +158,7 @@ class Options(urwid.WidgetWrap):
self.master.scripts = []
self.master.set_stickyauth(None)
self.master.set_stickycookie(None)
self.master.state.default_body_view = contentview.get("Auto")
self.master.state.default_body_view = contentviews.get("Auto")
signals.update_settings.send(self)
signals.status_message.send(
@ -233,7 +233,7 @@ class Options(urwid.WidgetWrap):
def default_displaymode(self):
signals.status_prompt_onekey.send(
prompt = "Global default display mode",
keys = contentview.view_prompts,
keys = contentviews.view_prompts,
callback = self.master.change_default_display_mode
)

View File

@ -1,4 +1,17 @@
from __future__ import absolute_import
"""
Mitmproxy Content Views
=======================
mitmproxy includes a set of content views which can be used to format/decode/highlight data.
While they are currently used for HTTP message bodies only, the may be used in other contexts
in the future, e.g. to decode protobuf messages sent as WebSocket frames.
Thus, the View API is very minimalistic. The only arguments are `data` and `**metadata`,
where `data` is the actual content (as bytes). The contents on metadata depend on the protocol in
use. For HTTP, the message headers are passed as the ``headers`` keyword argument.
"""
from __future__ import (absolute_import, print_function, division)
import cStringIO
import json
import logging
@ -8,14 +21,14 @@ import sys
import lxml.html
import lxml.etree
from PIL import Image
from PIL.ExifTags import TAGS
import html2text
import six
from netlib.odict import ODict
from netlib import encoding
import netlib.utils
from netlib.utils import clean_bin, hexdump, urldecode, multipartdecode, parse_content_type
from . import utils
from .exceptions import ContentViewException
from .contrib import jsbeautifier
@ -39,13 +52,15 @@ else:
cssutils.ser.prefs.indentClosingBrace = False
cssutils.ser.prefs.validOnly = False
VIEW_CUTOFF = 1024 * 50
# Default view cutoff *in lines*
VIEW_CUTOFF = 512
KEY_MAX = 30
def format_dict(d):
"""
Transforms the given dictionary into a list of
Helper function that transforms the given dictionary into a list of
("key", key )
("value", value)
tuples, where key is padded to a uniform width.
@ -61,39 +76,38 @@ def format_dict(d):
]
def format_text(content, limit):
def format_text(text):
"""
Transforms the given content into
Helper function that transforms bytes into the view output format.
"""
content = netlib.utils.cleanBin(content)
for line in content[:limit].splitlines():
for line in text.splitlines():
yield [("text", line)]
for msg in trailer(content, limit):
yield msg
def trailer(content, limit):
bytes_removed = len(content) - limit
if bytes_removed > 0:
yield [
("cutoff", "... {} of data not shown.".format(netlib.utils.pretty_size(bytes_removed)))
]
class View(object):
name = None
prompt = ()
content_types = []
def __call__(self, hdrs, content, limit):
def __call__(self, data, **metadata):
"""
Transform raw data into human-readable output.
Args:
data: the data to decode/format as bytes.
metadata: optional keyword-only arguments for metadata. Implementations must not
rely on a given argument being present.
Returns:
A (description, content generator) tuple.
The content generator yields lists of (style, text) tuples.
Iit must not yield tuples of tuples, because urwid cannot process that.
The content generator yields lists of (style, text) tuples, where each list represents
a single line. ``text`` is a unfiltered byte string which may need to be escaped,
depending on the used output.
Caveats:
The content generator must not yield tuples of tuples,
because urwid cannot process that. You have to yield a *list* of tuples per line.
"""
raise NotImplementedError()
@ -103,16 +117,19 @@ class ViewAuto(View):
prompt = ("auto", "a")
content_types = []
def __call__(self, hdrs, content, limit):
ctype = hdrs.get("content-type")
def __call__(self, data, **metadata):
headers = metadata.get("headers", {})
ctype = headers.get("content-type")
if ctype:
ct = netlib.utils.parse_content_type(ctype) if ctype else None
ct = parse_content_type(ctype) if ctype else None
ct = "%s/%s" % (ct[0], ct[1])
if ct in content_types_map:
return content_types_map[ct][0](hdrs, content, limit)
elif utils.isXML(content):
return get("XML")(hdrs, content, limit)
return get("Raw")(hdrs, content, limit)
return content_types_map[ct][0](data, **metadata)
elif utils.isXML(data):
return get("XML")(data, **metadata)
if utils.isMostlyBin(data):
return get("Hex")(data)
return get("Raw")(data)
class ViewRaw(View):
@ -120,8 +137,8 @@ class ViewRaw(View):
prompt = ("raw", "r")
content_types = []
def __call__(self, hdrs, content, limit):
return "Raw", format_text(content, limit)
def __call__(self, data, **metadata):
return "Raw", format_text(data)
class ViewHex(View):
@ -130,18 +147,16 @@ class ViewHex(View):
content_types = []
@staticmethod
def _format(content, limit):
for offset, hexa, s in netlib.utils.hexdump(content[:limit]):
def _format(data):
for offset, hexa, s in hexdump(data):
yield [
("offset", offset + " "),
("text", hexa + " "),
("text", s)
]
for msg in trailer(content, limit):
yield msg
def __call__(self, hdrs, content, limit):
return "Hex", self._format(content, limit)
def __call__(self, data, **metadata):
return "Hex", self._format(data)
class ViewXML(View):
@ -149,7 +164,7 @@ class ViewXML(View):
prompt = ("xml", "x")
content_types = ["text/xml"]
def __call__(self, hdrs, content, limit):
def __call__(self, data, **metadata):
parser = lxml.etree.XMLParser(
remove_blank_text=True,
resolve_entities=False,
@ -157,7 +172,7 @@ class ViewXML(View):
recover=False
)
try:
document = lxml.etree.fromstring(content, parser)
document = lxml.etree.fromstring(data, parser)
except lxml.etree.XMLSyntaxError:
return None
docinfo = document.getroottree().docinfo
@ -183,7 +198,7 @@ class ViewXML(View):
encoding=docinfo.encoding
)
return "XML-like data", format_text(s, limit)
return "XML-like data", format_text(s)
class ViewJSON(View):
@ -191,10 +206,10 @@ class ViewJSON(View):
prompt = ("json", "s")
content_types = ["application/json"]
def __call__(self, hdrs, content, limit):
pretty_json = utils.pretty_json(content)
def __call__(self, data, **metadata):
pretty_json = utils.pretty_json(data)
if pretty_json:
return "JSON", format_text(pretty_json, limit)
return "JSON", format_text(pretty_json)
class ViewHTML(View):
@ -202,20 +217,20 @@ class ViewHTML(View):
prompt = ("html", "h")
content_types = ["text/html"]
def __call__(self, hdrs, content, limit):
if utils.isXML(content):
def __call__(self, data, **metadata):
if utils.isXML(data):
parser = lxml.etree.HTMLParser(
strip_cdata=True,
remove_blank_text=True
)
d = lxml.html.fromstring(content, parser=parser)
d = lxml.html.fromstring(data, parser=parser)
docinfo = d.getroottree().docinfo
s = lxml.etree.tostring(
d,
pretty_print=True,
doctype=docinfo.doctype
)
return "HTML", format_text(s, limit)
return "HTML", format_text(s)
class ViewHTMLOutline(View):
@ -223,13 +238,13 @@ class ViewHTMLOutline(View):
prompt = ("html outline", "o")
content_types = ["text/html"]
def __call__(self, hdrs, content, limit):
content = content.decode("utf-8")
def __call__(self, data, **metadata):
data = data.decode("utf-8")
h = html2text.HTML2Text(baseurl="")
h.ignore_images = True
h.body_width = 0
content = h.handle(content)
return "HTML Outline", format_text(content, limit)
outline = h.handle(data)
return "HTML Outline", format_text(outline)
class ViewURLEncoded(View):
@ -237,8 +252,8 @@ class ViewURLEncoded(View):
prompt = ("urlencoded", "u")
content_types = ["application/x-www-form-urlencoded"]
def __call__(self, hdrs, content, limit):
d = netlib.utils.urldecode(content)
def __call__(self, data, **metadata):
d = urldecode(data)
return "URLEncoded form", format_dict(ODict(d))
@ -253,8 +268,9 @@ class ViewMultipart(View):
for message in format_dict(ODict(v)):
yield message
def __call__(self, hdrs, content, limit):
v = netlib.utils.multipartdecode(hdrs, content)
def __call__(self, data, **metadata):
headers = metadata.get("headers", {})
v = multipartdecode(headers, data)
if v:
return "Multipart form", self._format(v)
@ -308,7 +324,7 @@ if pyamf:
else:
return b
def _format(self, envelope, limit):
def _format(self, envelope):
for target, message in iter(envelope):
if isinstance(message, pyamf.remoting.Request):
yield [
@ -322,13 +338,13 @@ if pyamf:
]
s = json.dumps(self.unpack(message), indent=4)
for msg in format_text(s, limit):
for msg in format_text(s):
yield msg
def __call__(self, hdrs, content, limit):
envelope = remoting.decode(content, strict=False)
def __call__(self, data, **metadata):
envelope = remoting.decode(data, strict=False)
if envelope:
return "AMF v%s" % envelope.amfVersion, self._format(envelope, limit)
return "AMF v%s" % envelope.amfVersion, self._format(envelope)
class ViewJavaScript(View):
@ -340,12 +356,11 @@ class ViewJavaScript(View):
"text/javascript"
]
def __call__(self, hdrs, content, limit):
def __call__(self, data, **metadata):
opts = jsbeautifier.default_options()
opts.indent_size = 2
res = jsbeautifier.beautify(content[:limit], opts)
cutoff = max(0, len(content) - limit)
return "JavaScript", format_text(res, limit - cutoff)
res = jsbeautifier.beautify(data, opts)
return "JavaScript", format_text(res)
class ViewCSS(View):
@ -355,14 +370,14 @@ class ViewCSS(View):
"text/css"
]
def __call__(self, hdrs, content, limit):
def __call__(self, data, **metadata):
if cssutils:
sheet = cssutils.parseString(content)
sheet = cssutils.parseString(data)
beautified = sheet.cssText
else:
beautified = content
beautified = data
return "CSS", format_text(beautified, limit)
return "CSS", format_text(beautified)
class ViewImage(View):
@ -376,9 +391,9 @@ class ViewImage(View):
"image/x-icon",
]
def __call__(self, hdrs, content, limit):
def __call__(self, data, **metadata):
try:
img = Image.open(cStringIO.StringIO(content))
img = Image.open(cStringIO.StringIO(data))
except IOError:
return None
parts = [
@ -399,12 +414,7 @@ class ViewImage(View):
parts.append(
(str(tag), str(ex[i]))
)
clean = []
for i in parts:
clean.append(
[netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])]
)
fmt = format_dict(ODict(clean))
fmt = format_dict(ODict(parts))
return "%s image" % img.format, fmt
@ -445,9 +455,9 @@ class ViewProtobuf(View):
else:
return err
def __call__(self, hdrs, content, limit):
decoded = self.decode_protobuf(content)
return "Protobuf", format_text(decoded, limit)
def __call__(self, data, **metadata):
decoded = self.decode_protobuf(data)
return "Protobuf", format_text(decoded)
class ViewWBXML(View):
@ -458,13 +468,13 @@ class ViewWBXML(View):
"application/vnd.ms-sync.wbxml"
]
def __call__(self, hdrs, content, limit):
def __call__(self, data, **metadata):
try:
parser = ASCommandResponse(content)
parser = ASCommandResponse(data)
parsedContent = parser.xmlString
if parsedContent:
return "WBXML", format_text(parsedContent, limit)
return "WBXML", format_text(parsedContent)
except:
return None
@ -511,29 +521,47 @@ def get(name):
return i
def get_content_view(viewmode, headers, content, limit, is_request):
def safe_to_print(lines, encoding="utf8"):
"""
Wraps a content generator so that each text portion is a *safe to print* unicode string.
"""
for line in lines:
clean_line = []
for (style, text) in line:
try:
text = clean_bin(text.decode(encoding, "strict"))
except UnicodeDecodeError:
text = clean_bin(text).decode(encoding, "strict")
clean_line.append((style, text))
yield clean_line
def get_content_view(viewmode, data, **metadata):
"""
Args:
viewmode: the view to use.
data, **metadata: arguments passed to View instance.
Returns:
A (description, content generator) tuple.
In contrast to calling the views directly, text is always safe-to-print unicode.
Raises:
ContentViewException, if the content view threw an error.
"""
if not content:
if is_request:
return "No request content (press tab to view response)", []
else:
if not data:
return "No content", []
msg = []
headers = metadata.get("headers", {})
enc = headers.get("content-encoding")
if enc and enc != "identity":
decoded = encoding.decode(enc, content)
decoded = encoding.decode(enc, data)
if decoded:
content = decoded
data = decoded
msg.append("[decoded %s]" % enc)
try:
ret = viewmode(headers, content, limit)
ret = viewmode(data, **metadata)
# Third-party viewers can fail in unexpected ways...
except Exception as e:
six.reraise(
@ -542,8 +570,8 @@ def get_content_view(viewmode, headers, content, limit, is_request):
sys.exc_info()[2]
)
if not ret:
ret = get("Raw")(headers, content, limit)
ret = get("Raw")(data, **metadata)
msg.append("Couldn't parse: falling back to Raw")
else:
msg.append(ret[0])
return " ".join(msg), ret[1]
return " ".join(msg), safe_to_print(ret[1])

View File

@ -1,14 +1,16 @@
from __future__ import absolute_import, print_function
import json
import sys
import os
import traceback
import click
import itertools
from netlib.http.semantics import CONTENT_MISSING
import netlib.utils
from . import flow, filt, utils
from .protocol import http
from . import flow, filt, contentviews
from .exceptions import ContentViewException
from .models import HTTPRequest
class DumpError(Exception):
pass
@ -55,26 +57,8 @@ class Options(object):
setattr(self, i, None)
def str_response(resp):
r = "%s %s" % (resp.code, resp.msg)
if resp.is_replay:
r = "[replay] " + r
return r
def str_request(f, showhost):
if f.client_conn:
c = f.client_conn.address.host
else:
c = "[replay]"
r = "%s %s %s" % (c, f.request.method, f.request.pretty_url(showhost))
if f.request.stickycookie:
r = "[stickycookie] " + r
return r
class DumpMaster(flow.FlowMaster):
def __init__(self, server, options, outfile=sys.stdout):
def __init__(self, server, options, outfile=None):
flow.FlowMaster.__init__(self, server, flow.State())
self.outfile = outfile
self.o = options
@ -103,7 +87,7 @@ class DumpMaster(flow.FlowMaster):
if options.outfile:
path = os.path.expanduser(options.outfile[0])
try:
f = file(path, options.outfile[1])
f = open(path, options.outfile[1])
self.start_stream(f, self.filt)
except IOError as v:
raise DumpError(v.strerror)
@ -163,72 +147,168 @@ class DumpMaster(flow.FlowMaster):
def add_event(self, e, level="info"):
needed = dict(error=0, info=1, debug=2).get(level, 1)
if self.o.verbosity >= needed:
print(e, file=self.outfile)
self.outfile.flush()
self.echo(
e,
fg="red" if level == "error" else None,
dim=(level == "debug")
)
@staticmethod
def indent(n, t):
l = str(t).strip().splitlines()
def indent(n, text):
l = str(text).strip().splitlines()
pad = " " * n
return "\n".join(pad + i for i in l)
def _print_message(self, message):
def echo(self, text, indent=None, **style):
if indent:
text = self.indent(indent, text)
click.secho(text, file=self.outfile, **style)
def _echo_message(self, message):
if self.o.flow_detail >= 2:
print(self.indent(4, str(message.headers)), file=self.outfile)
headers = "\r\n".join(
"{}: {}".format(
click.style(k, fg="blue", bold=True),
click.style(v, fg="blue"))
for k, v in message.headers.fields
)
self.echo(headers, indent=4)
if self.o.flow_detail >= 3:
if message.content == CONTENT_MISSING:
print(self.indent(4, "(content missing)"), file=self.outfile)
elif message.content:
print("", file=self.outfile)
content = message.get_decoded_content()
if not utils.isBin(content):
if message.body == CONTENT_MISSING:
self.echo("(content missing)", indent=4)
elif message.body:
self.echo("")
try:
jsn = json.loads(content)
print(
self.indent(
4,
json.dumps(
jsn,
indent=2)),
file=self.outfile)
except ValueError:
print(self.indent(4, content), file=self.outfile)
type, lines = contentviews.get_content_view(
contentviews.get("Auto"),
message.body,
headers=message.headers
)
except ContentViewException:
s = "Content viewer failed: \n" + traceback.format_exc()
self.add_event(s, "debug")
type, lines = contentviews.get_content_view(
contentviews.get("Raw"),
message.body,
headers=message.headers
)
styles = dict(
highlight=dict(bold=True),
offset=dict(fg="blue"),
header=dict(fg="green", bold=True),
text=dict(fg="green")
)
def colorful(line):
yield u" " # we can already indent here
for (style, text) in line:
yield click.style(text, **styles.get(style, {}))
if self.o.flow_detail == 3:
lines_to_echo = itertools.islice(lines, 70)
else:
d = netlib.utils.hexdump(content)
d = "\n".join("%s\t%s %s" % i for i in d)
print(self.indent(4, d), file=self.outfile)
lines_to_echo = lines
lines_to_echo = list(lines_to_echo)
content = u"\r\n".join(
u"".join(colorful(line)) for line in lines_to_echo
)
self.echo(content)
if next(lines, None):
self.echo("(cut off)", indent=4, dim=True)
if self.o.flow_detail >= 2:
print("", file=self.outfile)
self.echo("")
def _echo_request_line(self, flow):
if flow.request.stickycookie:
stickycookie = click.style("[stickycookie] ", fg="yellow", bold=True)
else:
stickycookie = ""
if flow.client_conn:
client = click.style(flow.client_conn.address.host, bold=True)
else:
client = click.style("[replay]", fg="yellow", bold=True)
method = flow.request.method
method_color=dict(
GET="green",
DELETE="red"
).get(method.upper(), "magenta")
method = click.style(method, fg=method_color, bold=True)
url = click.style(flow.request.pretty_url(self.showhost), bold=True)
line = "{stickycookie}{client} {method} {url}".format(
stickycookie=stickycookie,
client=client,
method=method,
url=url
)
self.echo(line)
def _echo_response_line(self, flow):
if flow.response.is_replay:
replay = click.style("[replay] ", fg="yellow", bold=True)
else:
replay = ""
code = flow.response.status_code
code_color = None
if 200 <= code < 300:
code_color = "green"
elif 300 <= code < 400:
code_color = "magenta"
elif 400 <= code < 600:
code_color = "red"
code = click.style(str(code), fg=code_color, bold=True, blink=(code == 418))
msg = click.style(flow.response.msg, fg=code_color, bold=True)
if flow.response.content == CONTENT_MISSING:
size = "(content missing)"
else:
size = netlib.utils.pretty_size(len(flow.response.content))
size = click.style(size, bold=True)
arrows = click.style("<<", bold=True)
line = "{replay} {arrows} {code} {msg} {size}".format(
replay=replay,
arrows=arrows,
code=code,
msg=msg,
size=size
)
self.echo(line)
def echo_flow(self, f):
if self.o.flow_detail == 0:
return
if f.request:
self._echo_request_line(f)
self._echo_message(f.request)
if f.response:
self._echo_response_line(f)
self._echo_message(f.response)
if f.error:
self.echo(" << {}".format(f.error.msg), bold=True, fg="red")
if self.outfile:
self.outfile.flush()
def _process_flow(self, f):
self.state.delete_flow(f)
if self.filt and not f.match(self.filt):
return
if self.o.flow_detail == 0:
return
if f.request:
print(str_request(f, self.showhost), file=self.outfile)
self._print_message(f.request)
if f.response:
if f.response.content == CONTENT_MISSING:
sz = "(content missing)"
else:
sz = netlib.utils.pretty_size(len(f.response.content))
print(
" << %s %s" %
(str_response(
f.response),
sz),
file=self.outfile)
self._print_message(f.response)
if f.error:
print(" << {}".format(f.error.msg), file=self.outfile)
self.outfile.flush()
self.echo_flow(f)
def handle_request(self, f):
flow.FlowMaster.handle_request(self, f)

View File

@ -1,6 +1,7 @@
from __future__ import (absolute_import, print_function, division)
import itertools
import sys
import traceback
import six
@ -384,9 +385,13 @@ class HttpLayer(Layer):
return
except (HttpErrorConnClosed, NetLibError, HttpError, ProtocolException) as e:
error_propagated = False
if flow.request and not flow.response:
flow.error = Error(repr(e))
flow.error = Error(str(e))
self.channel.ask("error", flow)
self.log(traceback.format_exc(), "debug")
error_propagated = True
try:
self.send_response(make_error_response(
getattr(e, "code", 502),
@ -394,6 +399,8 @@ class HttpLayer(Layer):
))
except NetLibError:
pass
if not error_propagated:
if isinstance(e, ProtocolException):
six.reraise(ProtocolException, e, sys.exc_info()[2])
else:

View File

@ -7,7 +7,7 @@ import sys
from OpenSSL import SSL
from netlib.tcp import NetLibError, ssl_read_select
from netlib.utils import cleanBin
from netlib.utils import clean_bin
from ..exceptions import ProtocolException
from .base import Layer
@ -58,7 +58,7 @@ class RawTCPLayer(Layer):
direction = "-> tcp -> {}".format(repr(self.server_conn.address))
else:
direction = "<- tcp <- {}".format(repr(self.server_conn.address))
data = cleanBin(buf[:size].tobytes())
data = clean_bin(buf[:size].tobytes())
self.log(
"{}\r\n{}".format(direction, data),
"info"

View File

@ -1,4 +1,4 @@
from __future__ import absolute_import
from __future__ import (absolute_import, print_function, division)
import os
import datetime
import re
@ -30,15 +30,16 @@ def isBin(s):
"""
for i in s:
i = ord(i)
if i < 9:
return True
elif i > 13 and i < 32:
return True
elif i > 126:
if i < 9 or 13 < i < 32 or 126 < i:
return True
return False
def isMostlyBin(s):
s = s[:100]
return sum(isBin(ch) for ch in s)/len(s) > 0.3
def isXML(s):
for i in s:
if i in "\n \t":

View File

@ -23,15 +23,17 @@ deps = {
"html2text>=2015.4.14",
"construct>=2.5.2",
"six>=1.9.0",
"lxml>=3.3.6",
"Pillow>=2.3.0",
}
# A script -> additional dependencies dict.
scripts = {
"mitmproxy": {
"urwid>=1.3",
"lxml>=3.3.6",
"Pillow>=2.3.0",
},
"mitmdump": set(),
"mitmdump": {
"click>=5.1",
},
"mitmweb": set()
}
# Developer dependencies

View File

@ -1,12 +1,9 @@
from libmproxy.exceptions import ContentViewException
from netlib.http import Headers
import sys
import netlib.utils
from netlib import encoding
import libmproxy.contentview as cv
import libmproxy.contentviews as cv
import tutils
try:
@ -21,76 +18,65 @@ except:
class TestContentView:
def test_trailer(self):
txt = "X"*10
lines = cv.trailer(txt, 1000)
assert not list(lines)
lines = cv.trailer(txt, 5)
assert list(lines)
def test_view_auto(self):
v = cv.ViewAuto()
f = v(
Headers(),
"foo",
1000
headers=Headers()
)
assert f[0] == "Raw"
f = v(
Headers(content_type="text/html"),
"<html></html>",
1000
headers=Headers(content_type="text/html")
)
assert f[0] == "HTML"
f = v(
Headers(content_type="text/flibble"),
"foo",
1000
headers=Headers(content_type="text/flibble")
)
assert f[0] == "Raw"
f = v(
Headers(content_type="text/flibble"),
"<xml></xml>",
1000
headers=Headers(content_type="text/flibble")
)
assert f[0].startswith("XML")
def test_view_urlencoded(self):
d = netlib.utils.urlencode([("one", "two"), ("three", "four")])
v = cv.ViewURLEncoded()
assert v([], d, 100)
assert v(d)
d = netlib.utils.urlencode([("adsfa", "")])
v = cv.ViewURLEncoded()
assert v([], d, 100)
assert v(d)
def test_view_html(self):
v = cv.ViewHTML()
s = "<html><br><br></br><p>one</p></html>"
assert v([], s, 1000)
assert v(s)
s = "gobbledygook"
assert not v([], s, 1000)
assert not v(s)
def test_view_html_outline(self):
v = cv.ViewHTMLOutline()
s = "<html><br><br></br><p>one</p></html>"
assert v([], s, 1000)
assert v(s)
def test_view_json(self):
cv.VIEW_CUTOFF = 100
v = cv.ViewJSON()
assert v([], "{}", 1000)
assert not v([], "{", 1000)
assert v([], "[" + ",".join(["0"] * cv.VIEW_CUTOFF) + "]", 1000)
assert v([], "[1, 2, 3, 4, 5]", 5)
assert v("{}")
assert not v("{")
assert v("[1, 2, 3, 4, 5]")
def test_view_xml(self):
v = cv.ViewXML()
assert v([], "<foo></foo>", 1000)
assert not v([], "<foo>", 1000)
assert v("<foo></foo>")
assert not v("<foo>")
s = """<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet title="XSL_formatting"?>
<rss
@ -99,17 +85,17 @@ class TestContentView:
version="2.0">
</rss>
"""
assert v([], s, 1000)
assert v(s)
def test_view_raw(self):
v = cv.ViewRaw()
assert v([], "foo", 1000)
assert v("foo")
def test_view_javascript(self):
v = cv.ViewJavaScript()
assert v([], "[1, 2, 3]", 100)
assert v([], "[1, 2, 3", 100)
assert v([], "function(a){[1, 2, 3]}", 100)
assert v("[1, 2, 3]")
assert v("[1, 2, 3")
assert v("function(a){[1, 2, 3]}")
def test_view_css(self):
v = cv.ViewCSS()
@ -117,14 +103,14 @@ class TestContentView:
with open(tutils.test_data.path('data/1.css'), 'r') as fp:
fixture_1 = fp.read()
result = v([], 'a', 100)
result = v('a')
if cssutils:
assert len(list(result[1])) == 0
else:
assert len(list(result[1])) == 1
result = v([], fixture_1, 100)
result = v(fixture_1)
if cssutils:
assert len(list(result[1])) > 1
@ -133,23 +119,23 @@ class TestContentView:
def test_view_hex(self):
v = cv.ViewHex()
assert v([], "foo", 1000)
assert v("foo")
def test_view_image(self):
v = cv.ViewImage()
p = tutils.test_data.path("data/image.png")
assert v([], file(p, "rb").read(), sys.maxsize)
assert v(file(p, "rb").read())
p = tutils.test_data.path("data/image.gif")
assert v([], file(p, "rb").read(), sys.maxsize)
assert v(file(p, "rb").read())
p = tutils.test_data.path("data/image-err1.jpg")
assert v([], file(p, "rb").read(), sys.maxsize)
assert v(file(p, "rb").read())
p = tutils.test_data.path("data/image.ico")
assert v([], file(p, "rb").read(), sys.maxsize)
assert v(file(p, "rb").read())
assert not v([], "flibble", sys.maxsize)
assert not v("flibble")
def test_view_multipart(self):
view = cv.ViewMultipart()
@ -161,42 +147,36 @@ Larry
--AaB03x
""".strip()
h = Headers(content_type="multipart/form-data; boundary=AaB03x")
assert view(h, v, 1000)
assert view(v, headers=h)
h = Headers()
assert not view(h, v, 1000)
assert not view(v, headers=h)
h = Headers(content_type="multipart/form-data")
assert not view(h, v, 1000)
assert not view(v, headers=h)
h = Headers(content_type="unparseable")
assert not view(h, v, 1000)
assert not view(v, headers=h)
def test_get_content_view(self):
r = cv.get_content_view(
cv.get("Raw"),
Headers(content_type="application/json"),
"[1, 2, 3]",
1000,
False
headers=Headers(content_type="application/json")
)
assert "Raw" in r[0]
r = cv.get_content_view(
cv.get("Auto"),
Headers(content_type="application/json"),
"[1, 2, 3]",
1000,
False
headers=Headers(content_type="application/json")
)
assert r[0] == "JSON"
r = cv.get_content_view(
cv.get("Auto"),
Headers(content_type="application/json"),
"[1, 2",
1000,
False
headers=Headers(content_type="application/json")
)
assert "Raw" in r[0]
@ -204,34 +184,28 @@ Larry
ContentViewException,
cv.get_content_view,
cv.get("AMF"),
Headers(),
"[1, 2",
1000,
False
headers=Headers()
)
r = cv.get_content_view(
cv.get("Auto"),
Headers(
encoding.encode('gzip', "[1, 2, 3]"),
headers=Headers(
content_type="application/json",
content_encoding="gzip"
),
encoding.encode('gzip', "[1, 2, 3]"),
1000,
False
)
)
assert "decoded gzip" in r[0]
assert "JSON" in r[0]
r = cv.get_content_view(
cv.get("XML"),
Headers(
encoding.encode('gzip', "[1, 2, 3]"),
headers=Headers(
content_type="application/json",
content_encoding="gzip"
),
encoding.encode('gzip', "[1, 2, 3]"),
1000,
False
)
)
assert "decoded gzip" in r[0]
assert "Raw" in r[0]
@ -242,22 +216,22 @@ if pyamf:
v = cv.ViewAMF()
p = tutils.test_data.path("data/amf01")
assert v([], file(p, "rb").read(), sys.maxsize)
assert v(file(p, "rb").read())
p = tutils.test_data.path("data/amf02")
assert v([], file(p, "rb").read(), sys.maxsize)
assert v(file(p, "rb").read())
def test_view_amf_response():
v = cv.ViewAMF()
p = tutils.test_data.path("data/amf03")
assert v([], file(p, "rb").read(), sys.maxsize)
assert v(file(p, "rb").read())
if cv.ViewProtobuf.is_available():
def test_view_protobuf_request():
v = cv.ViewProtobuf()
p = tutils.test_data.path("data/protobuf01")
content_type, output = v([], file(p, "rb").read(), sys.maxsize)
content_type, output = v(file(p, "rb").read())
assert content_type == "Protobuf"
assert output[0].text == '1: "3bbc333c-e61c-433b-819a-0b9a8cc103b8"'

View File

@ -1,5 +1,6 @@
import os
from cStringIO import StringIO
from libmproxy.exceptions import ContentViewException
from libmproxy.models import HTTPResponse
import netlib.tutils
@ -12,17 +13,51 @@ import mock
def test_strfuncs():
t = HTTPResponse.wrap(netlib.tutils.tresp())
t.is_replay = True
dump.str_response(t)
o = dump.Options()
m = dump.DumpMaster(None, o)
f = tutils.tflow()
f.client_conn = None
f.request.stickycookie = True
assert "stickycookie" in dump.str_request(f, False)
assert "stickycookie" in dump.str_request(f, True)
assert "replay" in dump.str_request(f, False)
assert "replay" in dump.str_request(f, True)
m.outfile = StringIO()
m.o.flow_detail = 0
m.echo_flow(tutils.tflow())
assert not m.outfile.getvalue()
m.o.flow_detail = 4
m.echo_flow(tutils.tflow())
assert m.outfile.getvalue()
m.outfile = StringIO()
m.echo_flow(tutils.tflow(resp=True))
assert "<<" in m.outfile.getvalue()
m.outfile = StringIO()
m.echo_flow(tutils.tflow(err=True))
assert "<<" in m.outfile.getvalue()
flow = tutils.tflow()
flow.request = netlib.tutils.treq()
flow.request.stickycookie = True
flow.client_conn = mock.MagicMock()
flow.client_conn.address.host = "foo"
flow.response = netlib.tutils.tresp(content=CONTENT_MISSING)
flow.response.is_replay = True
flow.response.code = 300
m.echo_flow(flow)
flow = tutils.tflow(resp=netlib.tutils.tresp("{"))
flow.response.headers["content-type"] = "application/json"
flow.response.code = 400
m.echo_flow(flow)
@mock.patch("libmproxy.contentviews.get_content_view")
def test_contentview(get_content_view):
get_content_view.side_effect = ContentViewException(""), ("x", iter([]))
o = dump.Options(flow_detail=4, verbosity=3)
m = dump.DumpMaster(None, o, StringIO())
m.echo_flow(tutils.tflow())
assert "Content viewer failed" in m.outfile.getvalue()
class TestDumpMaster: