mitmproxy/libmproxy/contentview.py

574 lines
15 KiB
Python
Raw Normal View History

from __future__ import absolute_import
2015-03-13 12:14:37 +00:00
import cStringIO
import json
import logging
2015-09-04 15:07:14 +00:00
import subprocess
import traceback
2015-03-13 12:14:37 +00:00
import lxml.html
import lxml.etree
from PIL import Image
from PIL.ExifTags import TAGS
import html2text
import netlib.utils
from . import utils
from .contrib import jsbeautifier
from .contrib.wbxml.ASCommandResponse import ASCommandResponse
2015-09-05 18:45:58 +00:00
from netlib import encoding
2015-03-13 12:14:37 +00:00
2012-08-18 05:51:34 +00:00
try:
import pyamf
from pyamf import remoting, flex
2015-05-30 00:03:28 +00:00
except ImportError: # pragma nocover
2012-08-18 05:51:34 +00:00
pyamf = None
try:
import cssutils
2015-05-30 00:03:28 +00:00
except ImportError: # pragma nocover
cssutils = None
else:
cssutils.log.setLevel(logging.CRITICAL)
cssutils.ser.prefs.keepComments = True
cssutils.ser.prefs.omitLastSemicolon = False
cssutils.ser.prefs.indentClosingBrace = False
cssutils.ser.prefs.validOnly = False
2015-05-30 00:03:28 +00:00
VIEW_CUTOFF = 1024 * 50
2015-09-04 15:07:14 +00:00
KEY_MAX = 30
2015-09-04 15:07:14 +00:00
def format_dict(d):
"""
Transforms the given dictionary into a list of
("key", key )
("value", value)
tuples, where key is padded to a uniform width.
"""
max_key_len = max(len(k) for k in d.keys())
max_key_len = min(max_key_len, KEY_MAX)
for key, value in d.items():
key += ":"
key = key.ljust(max_key_len + 2)
yield (
2015-09-04 15:33:21 +00:00
("header", key),
("text", value)
2015-09-04 15:07:14 +00:00
)
2015-09-02 18:56:19 +00:00
2015-09-04 15:07:14 +00:00
def format_text(content, limit):
"""
2015-09-04 15:07:14 +00:00
Transforms the given content into
"""
2015-09-04 15:07:14 +00:00
content = netlib.utils.cleanBin(content)
for line in content[:limit].splitlines():
yield ("text", line)
for msg in trailer(content, limit):
yield msg
def trailer(content, limit):
bytes_removed = len(content) - limit
if bytes_removed > 0:
yield (
"cutoff",
"... {} of data not shown.".format(netlib.utils.pretty_size(bytes_removed))
)
"""
def _view_text(content, total, limit):
""
Generates a body for a chunk of text.
""
txt = []
for i in netlib.utils.cleanBin(content).splitlines():
txt.append(
urwid.Text(("text", i), wrap="any")
)
trailer(total, txt, limit)
return txt
2012-08-18 05:08:17 +00:00
def trailer(clen, txt, limit):
rem = clen - limit
if rem > 0:
txt.append(urwid.Text(""))
txt.append(
2012-08-18 05:08:17 +00:00
urwid.Text(
[
2015-05-30 00:03:28 +00:00
("highlight", "... %s of data not shown. Press " % netlib.utils.pretty_size(rem)),
2012-08-18 05:08:17 +00:00
("key", "f"),
("highlight", " to load all data.")
]
)
)
2015-09-04 15:07:14 +00:00
"""
2015-09-04 15:33:21 +00:00
class View(object):
2015-09-04 15:07:14 +00:00
name = None
prompt = ()
content_types = []
def __call__(self, hdrs, content, limit):
2015-09-04 15:33:21 +00:00
"""
Returns:
A (mode name, content generator) tuple.
"""
2015-09-04 15:07:14 +00:00
raise NotImplementedError()
class ViewAuto(View):
2012-08-18 05:08:17 +00:00
name = "Auto"
prompt = ("auto", "a")
content_types = []
2015-03-13 12:14:37 +00:00
def __call__(self, hdrs, content, limit):
2015-09-05 18:45:58 +00:00
ctype = hdrs.get("content-type")
if ctype:
2015-08-09 20:15:58 +00:00
ct = netlib.utils.parse_content_type(ctype) if ctype else None
2015-05-30 00:03:28 +00:00
ct = "%s/%s" % (ct[0], ct[1])
if ct in content_types_map:
return content_types_map[ct][0](hdrs, content, limit)
elif utils.isXML(content):
2015-09-04 15:33:21 +00:00
return get("XML")(hdrs, content, limit)
return get("Raw")(hdrs, content, limit)
2015-09-04 15:07:14 +00:00
class ViewRaw(View):
2012-08-18 05:08:17 +00:00
name = "Raw"
prompt = ("raw", "r")
content_types = []
2015-03-13 12:14:37 +00:00
2012-08-18 05:08:17 +00:00
def __call__(self, hdrs, content, limit):
2015-09-04 15:07:14 +00:00
return "Raw", format_text(content, limit)
2015-09-04 15:07:14 +00:00
class ViewHex(View):
2012-08-18 05:08:17 +00:00
name = "Hex"
prompt = ("hex", "e")
content_types = []
2015-03-13 12:14:37 +00:00
2015-09-04 15:07:14 +00:00
@staticmethod
def _format(content, limit):
for offset, hexa, s in netlib.utils.hexdump(content[:limit]):
2015-09-04 15:07:14 +00:00
yield (
("offset", offset + " "),
("text", hexa + " "),
2012-08-18 05:08:17 +00:00
("text", s),
2015-09-04 15:07:14 +00:00
)
for msg in trailer(content, limit):
yield msg
def __call__(self, hdrs, content, limit):
return "Hex", self._format(content, limit)
2012-08-18 05:08:17 +00:00
2015-09-04 15:07:14 +00:00
class ViewXML(View):
2012-08-18 05:08:17 +00:00
name = "XML"
prompt = ("xml", "x")
content_types = ["text/xml"]
2015-03-13 12:14:37 +00:00
2012-08-18 05:08:17 +00:00
def __call__(self, hdrs, content, limit):
2015-03-13 12:14:37 +00:00
parser = lxml.etree.XMLParser(
remove_blank_text=True,
resolve_entities=False,
strip_cdata=False,
recover=False
)
2012-08-18 05:08:17 +00:00
try:
document = lxml.etree.fromstring(content, parser)
except lxml.etree.XMLSyntaxError:
return None
docinfo = document.getroottree().docinfo
prev = []
p = document.getroottree().getroot().getprevious()
while p is not None:
prev.insert(
0,
lxml.etree.tostring(p)
)
p = p.getprevious()
2015-03-13 12:14:37 +00:00
doctype = docinfo.doctype
2012-08-18 05:08:17 +00:00
if prev:
doctype += "\n".join(prev).strip()
doctype = doctype.strip()
s = lxml.etree.tostring(
2015-03-13 12:14:37 +00:00
document,
pretty_print=True,
xml_declaration=True,
doctype=doctype or None,
2015-09-04 15:07:14 +00:00
encoding=docinfo.encoding
2015-03-13 12:14:37 +00:00
)
2012-08-18 05:08:17 +00:00
2015-09-04 15:07:14 +00:00
return "XML-like data", format_text(s, limit)
2012-08-18 05:08:17 +00:00
2015-09-04 15:07:14 +00:00
class ViewJSON(View):
2012-08-18 05:08:17 +00:00
name = "JSON"
prompt = ("json", "s")
2012-08-18 05:08:17 +00:00
content_types = ["application/json"]
2015-03-13 12:14:37 +00:00
2012-08-18 05:08:17 +00:00
def __call__(self, hdrs, content, limit):
2015-09-04 15:07:14 +00:00
pretty_json = utils.pretty_json(content)
return "JSON", format_text(pretty_json, limit)
2012-08-18 05:08:17 +00:00
2015-09-04 15:33:21 +00:00
class ViewHTML(View):
2012-08-18 05:08:17 +00:00
name = "HTML"
prompt = ("html", "h")
content_types = ["text/html"]
2015-03-13 12:14:37 +00:00
2012-08-18 05:08:17 +00:00
def __call__(self, hdrs, content, limit):
if utils.isXML(content):
2015-03-13 12:14:37 +00:00
parser = lxml.etree.HTMLParser(
strip_cdata=True,
remove_blank_text=True
)
2012-08-18 05:08:17 +00:00
d = lxml.html.fromstring(content, parser=parser)
docinfo = d.getroottree().docinfo
2015-03-13 12:14:37 +00:00
s = lxml.etree.tostring(
d,
pretty_print=True,
doctype=docinfo.doctype
)
2015-09-04 15:07:14 +00:00
return "HTML", format_text(s, limit)
2012-08-18 05:08:17 +00:00
2015-09-04 15:33:21 +00:00
class ViewHTMLOutline(View):
2012-08-18 05:08:17 +00:00
name = "HTML Outline"
prompt = ("html outline", "o")
content_types = ["text/html"]
2015-03-13 12:14:37 +00:00
2012-08-18 05:08:17 +00:00
def __call__(self, hdrs, content, limit):
content = content.decode("utf-8")
h = html2text.HTML2Text(baseurl="")
h.ignore_images = True
h.body_width = 0
content = h.handle(content)
2015-09-04 15:33:21 +00:00
return "HTML Outline", format_text(content, limit)
2012-08-18 05:08:17 +00:00
2015-09-04 15:33:21 +00:00
class ViewURLEncoded(View):
2012-08-18 05:08:17 +00:00
name = "URL-encoded"
prompt = ("urlencoded", "u")
content_types = ["application/x-www-form-urlencoded"]
2015-03-13 12:14:37 +00:00
2012-08-18 05:08:17 +00:00
def __call__(self, hdrs, content, limit):
2015-09-04 15:33:21 +00:00
d = netlib.utils.urldecode(content)
return "URLEncoded form", format_dict(d)
2012-08-18 05:08:17 +00:00
2015-09-04 15:33:21 +00:00
class ViewMultipart(View):
2012-08-18 05:08:17 +00:00
name = "Multipart Form"
prompt = ("multipart", "m")
content_types = ["multipart/form-data"]
2015-03-13 12:14:37 +00:00
2015-09-04 15:33:21 +00:00
@staticmethod
def _format(v):
yield (("highlight", "Form data:\n"))
for message in format_dict({key:val for key,val in v}):
yield message
2012-08-18 05:08:17 +00:00
def __call__(self, hdrs, content, limit):
2015-08-09 20:15:58 +00:00
v = netlib.utils.multipartdecode(hdrs, content)
2012-08-18 05:08:17 +00:00
if v:
2015-09-04 15:33:21 +00:00
return "Multipart form", self._format(v)
2012-08-18 05:08:17 +00:00
if pyamf:
class DummyObject(dict):
def __init__(self, alias):
dict.__init__(self)
def __readamf__(self, input):
data = input.readObject()
self["data"] = data
2015-09-04 15:07:14 +00:00
def pyamf_class_loader(s):
for i in pyamf.CLASS_LOADERS:
if i != pyamf_class_loader:
v = i(s)
if v:
return v
return DummyObject
2015-09-04 15:07:14 +00:00
pyamf.register_class_loader(pyamf_class_loader)
2015-09-04 15:07:14 +00:00
2015-09-04 15:33:21 +00:00
class ViewAMF(View):
name = "AMF"
prompt = ("amf", "f")
content_types = ["application/x-amf"]
def unpack(self, b, seen=set([])):
if hasattr(b, "body"):
return self.unpack(b.body, seen)
if isinstance(b, DummyObject):
if id(b) in seen:
return "<recursion>"
else:
seen.add(id(b))
for k, v in b.items():
b[k] = self.unpack(v, seen)
return b
elif isinstance(b, dict):
for k, v in b.items():
b[k] = self.unpack(v, seen)
return b
elif isinstance(b, list):
return [self.unpack(i) for i in b]
elif isinstance(b, flex.ArrayCollection):
return [self.unpack(i, seen) for i in b]
else:
return b
2015-09-04 15:33:21 +00:00
def _format(self, envelope, limit):
for target, message in iter(envelope):
if isinstance(message, pyamf.remoting.Request):
2015-09-04 15:33:21 +00:00
yield (
("header", "Request: "),
("text", str(target)),
2015-09-04 15:33:21 +00:00
)
else:
2015-09-04 15:33:21 +00:00
yield (
("header", "Response: "),
2015-05-30 00:03:28 +00:00
("text", "%s, code %s" % (target, message.status)),
2015-09-04 15:33:21 +00:00
)
s = json.dumps(self.unpack(message), indent=4)
2015-09-04 15:33:21 +00:00
for msg in format_text(s, limit):
yield msg
def __call__(self, hdrs, content, limit):
envelope = remoting.decode(content, strict=False)
if not envelope:
return None
2015-09-04 15:33:21 +00:00
return "AMF v%s" % envelope.amfVersion, self._format(envelope, limit)
2012-08-18 05:08:17 +00:00
2015-09-04 15:33:21 +00:00
class ViewJavaScript(View):
2012-08-18 05:08:17 +00:00
name = "JavaScript"
prompt = ("javascript", "j")
content_types = [
"application/x-javascript",
"application/javascript",
"text/javascript"
]
2015-03-13 12:14:37 +00:00
2012-08-18 05:08:17 +00:00
def __call__(self, hdrs, content, limit):
opts = jsbeautifier.default_options()
opts.indent_size = 2
res = jsbeautifier.beautify(content[:limit], opts)
2015-09-04 15:33:21 +00:00
cutoff = max(0, len(content) - limit)
return "JavaScript", format_text(res, limit - cutoff)
2015-03-13 12:14:37 +00:00
2015-09-04 15:33:21 +00:00
class ViewCSS(View):
name = "CSS"
prompt = ("css", "c")
content_types = [
"text/css"
]
def __call__(self, hdrs, content, limit):
if cssutils:
sheet = cssutils.parseString(content)
beautified = sheet.cssText
else:
beautified = content
2015-09-04 15:33:21 +00:00
return "CSS", format_text(beautified, limit)
2012-08-18 05:08:17 +00:00
2015-09-04 15:33:21 +00:00
class ViewImage(View):
2012-08-18 05:08:17 +00:00
name = "Image"
prompt = ("image", "i")
content_types = [
"image/png",
"image/jpeg",
"image/gif",
"image/vnd.microsoft.icon",
"image/x-icon",
]
2015-03-13 12:14:37 +00:00
2012-08-18 05:08:17 +00:00
def __call__(self, hdrs, content, limit):
try:
img = Image.open(cStringIO.StringIO(content))
except IOError:
return None
parts = [
("Format", str(img.format_description)),
2015-05-30 00:03:28 +00:00
("Size", "%s x %s px" % img.size),
2012-08-18 05:08:17 +00:00
("Mode", str(img.mode)),
]
2012-08-18 05:08:17 +00:00
for i in sorted(img.info.keys()):
if i != "exif":
parts.append(
(str(i), str(img.info[i]))
)
if hasattr(img, "_getexif"):
ex = img._getexif()
if ex:
for i in sorted(ex.keys()):
tag = TAGS.get(i, i)
parts.append(
(str(tag), str(ex[i]))
)
clean = []
for i in parts:
2015-03-13 12:14:37 +00:00
clean.append(
[netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])]
2012-08-18 05:08:17 +00:00
)
2015-09-04 15:33:21 +00:00
fmt = format_dict({k:v for k,v in clean})
2015-05-30 00:03:28 +00:00
return "%s image" % img.format, fmt
2012-08-18 05:08:17 +00:00
2015-03-13 12:14:37 +00:00
2015-09-04 15:33:21 +00:00
class ViewProtobuf(View):
"""Human friendly view of protocol buffers
The view uses the protoc compiler to decode the binary
"""
name = "Protocol Buffer"
prompt = ("protobuf", "p")
content_types = [
"application/x-protobuf",
"application/x-protobuffer",
]
@staticmethod
def is_available():
try:
2015-03-13 12:14:37 +00:00
p = subprocess.Popen(
["protoc", "--version"],
stdout=subprocess.PIPE
)
out, _ = p.communicate()
return out.startswith("libprotoc")
except:
return False
def decode_protobuf(self, content):
# if Popen raises OSError, it will be caught in
# get_content_view and fall back to Raw
p = subprocess.Popen(['protoc', '--decode_raw'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = p.communicate(input=content)
if out:
return out
else:
return err
def __call__(self, hdrs, content, limit):
decoded = self.decode_protobuf(content)
2015-09-04 15:33:21 +00:00
return "Protobuf", format_text(decoded, limit)
2012-08-18 05:08:17 +00:00
2015-03-13 12:14:37 +00:00
2015-09-04 15:33:21 +00:00
class ViewWBXML(View):
2014-05-06 17:27:13 +00:00
name = "WBXML"
prompt = ("wbxml", "w")
content_types = [
"application/vnd.wap.wbxml",
"application/vnd.ms-sync.wbxml"
]
def __call__(self, hdrs, content, limit):
2015-03-13 12:14:37 +00:00
2014-05-06 17:27:13 +00:00
try:
parser = ASCommandResponse(content)
parsedContent = parser.xmlString
2015-09-04 15:33:21 +00:00
return "WBXML", format_text(parsedContent, limit)
2014-05-06 17:27:13 +00:00
except:
2015-03-13 12:14:37 +00:00
return None
2014-05-06 17:27:13 +00:00
2015-09-04 15:07:14 +00:00
2012-08-18 05:08:17 +00:00
views = [
ViewAuto(),
ViewRaw(),
ViewHex(),
ViewJSON(),
ViewXML(),
2014-05-06 17:27:13 +00:00
ViewWBXML(),
2012-08-18 05:08:17 +00:00
ViewHTML(),
ViewHTMLOutline(),
ViewJavaScript(),
ViewCSS(),
2012-08-18 05:08:17 +00:00
ViewURLEncoded(),
ViewMultipart(),
ViewImage(),
]
2012-08-18 05:51:34 +00:00
if pyamf:
2012-08-18 05:08:17 +00:00
views.append(ViewAMF())
if ViewProtobuf.is_available():
views.append(ViewProtobuf())
2012-08-18 05:08:17 +00:00
content_types_map = {}
for i in views:
for ct in i.content_types:
l = content_types_map.setdefault(ct, [])
l.append(i)
2012-08-18 05:08:17 +00:00
view_prompts = [i.prompt for i in views]
2012-08-18 05:08:17 +00:00
def get_by_shortcut(c):
for i in views:
if i.prompt[1] == c:
return i
def get(name):
for i in views:
if i.name == name:
return i
def get_content_view(viewmode, headers, content, limit, is_request, log=None):
"""
2015-09-02 18:56:19 +00:00
Returns:
A (msg, body) tuple.
Raises:
ContentViewException, if the content view threw an error.
"""
if not content:
2015-02-05 16:12:48 +00:00
if is_request:
return "No request content (press tab to view response)", ""
else:
return "No content", ""
msg = []
2015-09-05 18:45:58 +00:00
enc = headers.get("content-encoding")
if enc and enc != "identity":
decoded = encoding.decode(enc, content)
if decoded:
content = decoded
2015-05-30 00:03:28 +00:00
msg.append("[decoded %s]" % enc)
try:
2015-09-05 18:45:58 +00:00
ret = viewmode(headers, content, limit)
# Third-party viewers can fail in unexpected ways...
except Exception:
2015-09-02 18:56:19 +00:00
if log:
s = traceback.format_exc()
s = "Content viewer failed: \n" + s
log(s, "error")
ret = None
if not ret:
2015-09-05 18:45:58 +00:00
ret = get("Raw")(headers, content, limit)
msg.append("Couldn't parse: falling back to Raw")
else:
msg.append(ret[0])
return " ".join(msg), ret[1]