Merge pull request #764 from mitmproxy/contentviews

Extract Content Views from Console
This commit is contained in:
Maximilian Hils 2015-09-11 15:31:25 +02:00
commit 02d80631dc
9 changed files with 186 additions and 163 deletions

View File

@ -14,9 +14,9 @@ import traceback
import urwid
import weakref
from .. import controller, flow, script
from .. import controller, flow, script, contentview
from . import flowlist, flowview, help, window, signals, options
from . import grideditor, palettes, contentview, statusbar, palettepicker
from . import grideditor, palettes, statusbar, palettepicker
EVENTLOG_SIZE = 500

View File

@ -1,15 +1,17 @@
from __future__ import absolute_import
import os
import sys
import traceback
import urwid
from netlib import odict
from netlib.http.semantics import CONTENT_MISSING, Headers
from . import common, grideditor, contentview, signals, searchable, tabs
from . import common, grideditor, signals, searchable, tabs
from . import flowdetailview
from .. import utils, controller
from .. import utils, controller, contentview
from ..models import HTTPRequest, HTTPResponse, decoded
from ..exceptions import ContentViewException
class SearchError(Exception):
@ -179,15 +181,29 @@ class FlowView(tabs.Tabs):
limit = sys.maxsize
else:
limit = contentview.VIEW_CUTOFF
description, text_objects = cache.get(
contentview.get_content_view,
return cache.get(
self._get_content_view,
viewmode,
conn.headers,
conn.content,
limit,
isinstance(conn, HTTPRequest)
)
return (description, text_objects)
def _get_content_view(self, viewmode, headers, content, limit, is_request):
try:
description, lines = contentview.get_content_view(
viewmode, headers, content, limit, is_request
)
except ContentViewException:
s = "Content viewer failed: \n" + traceback.format_exc()
signals.add_event(s, "error")
description, lines = contentview.get_content_view(
viewmode, headers, content, limit, is_request
)
description = description.replace("Raw", "Couldn't parse: falling back to Raw")
text_objects = [urwid.Text(l) for l in lines]
return description, text_objects
def viewmode_get(self):
override = self.state.get_flow_setting(

View File

@ -1,6 +1,7 @@
import urwid
from . import common, signals, grideditor, contentview
from .. import contentview
from . import common, signals, grideditor
from . import select, palettes
footer = [

View File

@ -2,22 +2,24 @@ from __future__ import absolute_import
import cStringIO
import json
import logging
import subprocess
import sys
import lxml.html
import lxml.etree
from PIL import Image
from PIL.ExifTags import TAGS
import subprocess
import traceback
import urwid
import html2text
import six
import netlib.utils
from netlib.odict import ODict
from netlib import encoding
from . import common, signals
from .. import utils
from ..contrib import jsbeautifier
from ..contrib.wbxml.ASCommandResponse import ASCommandResponse
import netlib.utils
from . import utils
from .exceptions import ContentViewException
from .contrib import jsbeautifier
from .contrib.wbxml.ASCommandResponse import ASCommandResponse
try:
import pyamf
@ -38,37 +40,65 @@ else:
cssutils.ser.prefs.validOnly = False
VIEW_CUTOFF = 1024 * 50
KEY_MAX = 30
def _view_text(content, total, limit):
def format_dict(d):
"""
Generates a body for a chunk of text.
Transforms the given dictionary into a list of
("key", key )
("value", value)
tuples, where key is padded to a uniform width.
"""
txt = []
for i in netlib.utils.cleanBin(content).splitlines():
txt.append(
urwid.Text(("text", i), wrap="any")
)
trailer(total, txt, limit)
return txt
max_key_len = max(len(k) for k in d.keys())
max_key_len = min(max_key_len, KEY_MAX)
for key, value in d.items():
key += ":"
key = key.ljust(max_key_len + 2)
yield [
("header", key),
("text", value)
]
def trailer(clen, txt, limit):
rem = clen - limit
if rem > 0:
txt.append(urwid.Text(""))
txt.append(
urwid.Text(
[
("highlight", "... %s of data not shown. Press " % netlib.utils.pretty_size(rem)),
("key", "f"),
("highlight", " to load all data.")
]
)
)
def format_text(content, limit):
"""
Transforms the given content into
"""
content = netlib.utils.cleanBin(content)
for line in content[:limit].splitlines():
yield [("text", line)]
for msg in trailer(content, limit):
yield msg
class ViewAuto:
def trailer(content, limit):
bytes_removed = len(content) - limit
if bytes_removed > 0:
yield [
("cutoff", "... {} of data not shown.".format(netlib.utils.pretty_size(bytes_removed)))
]
class View(object):
name = None
prompt = ()
content_types = []
def __call__(self, hdrs, content, limit):
"""
Returns:
A (description, content generator) tuple.
The content generator yields lists of (style, text) tuples.
Iit must not yield tuples of tuples, because urwid cannot process that.
"""
raise NotImplementedError()
class ViewAuto(View):
name = "Auto"
prompt = ("auto", "a")
content_types = []
@ -85,36 +115,36 @@ class ViewAuto:
return get("Raw")(hdrs, content, limit)
class ViewRaw:
class ViewRaw(View):
name = "Raw"
prompt = ("raw", "r")
content_types = []
def __call__(self, hdrs, content, limit):
txt = _view_text(content[:limit], len(content), limit)
return "Raw", txt
return "Raw", format_text(content, limit)
class ViewHex:
class ViewHex(View):
name = "Hex"
prompt = ("hex", "e")
content_types = []
def __call__(self, hdrs, content, limit):
txt = []
@staticmethod
def _format(content, limit):
for offset, hexa, s in netlib.utils.hexdump(content[:limit]):
txt.append(urwid.Text([
("offset", offset),
" ",
("text", hexa),
" ",
("text", s),
]))
trailer(len(content), txt, limit)
return "Hex", txt
yield [
("offset", offset + " "),
("text", hexa + " "),
("text", s)
]
for msg in trailer(content, limit):
yield msg
def __call__(self, hdrs, content, limit):
return "Hex", self._format(content, limit)
class ViewXML:
class ViewXML(View):
name = "XML"
prompt = ("xml", "x")
content_types = ["text/xml"]
@ -150,40 +180,24 @@ class ViewXML:
pretty_print=True,
xml_declaration=True,
doctype=doctype or None,
encoding = docinfo.encoding
encoding=docinfo.encoding
)
txt = []
for i in s[:limit].strip().split("\n"):
txt.append(
urwid.Text(("text", i)),
)
trailer(len(content), txt, limit)
return "XML-like data", txt
return "XML-like data", format_text(s, limit)
class ViewJSON:
class ViewJSON(View):
name = "JSON"
prompt = ("json", "s")
content_types = ["application/json"]
def __call__(self, hdrs, content, limit):
lines = utils.pretty_json(content)
if lines:
txt = []
sofar = 0
for i in lines:
sofar += len(i)
txt.append(
urwid.Text(("text", i)),
)
if sofar > limit:
break
trailer(sum(len(i) for i in lines), txt, limit)
return "JSON", txt
pretty_json = utils.pretty_json(content)
if pretty_json:
return "JSON", format_text(pretty_json, limit)
class ViewHTML:
class ViewHTML(View):
name = "HTML"
prompt = ("html", "h")
content_types = ["text/html"]
@ -201,10 +215,10 @@ class ViewHTML:
pretty_print=True,
doctype=docinfo.doctype
)
return "HTML", _view_text(s[:limit], len(s), limit)
return "HTML", format_text(s, limit)
class ViewHTMLOutline:
class ViewHTMLOutline(View):
name = "HTML Outline"
prompt = ("html outline", "o")
content_types = ["text/html"]
@ -215,43 +229,34 @@ class ViewHTMLOutline:
h.ignore_images = True
h.body_width = 0
content = h.handle(content)
txt = _view_text(content[:limit], len(content), limit)
return "HTML Outline", txt
return "HTML Outline", format_text(content, limit)
class ViewURLEncoded:
class ViewURLEncoded(View):
name = "URL-encoded"
prompt = ("urlencoded", "u")
content_types = ["application/x-www-form-urlencoded"]
def __call__(self, hdrs, content, limit):
lines = netlib.utils.urldecode(content)
if lines:
body = common.format_keyvals(
[(k + ":", v) for (k, v) in lines],
key = "header",
val = "text"
)
return "URLEncoded form", body
d = netlib.utils.urldecode(content)
return "URLEncoded form", format_dict(ODict(d))
class ViewMultipart:
class ViewMultipart(View):
name = "Multipart Form"
prompt = ("multipart", "m")
content_types = ["multipart/form-data"]
@staticmethod
def _format(v):
yield [("highlight", "Form data:\n")]
for message in format_dict(ODict(v)):
yield message
def __call__(self, hdrs, content, limit):
v = netlib.utils.multipartdecode(hdrs, content)
if v:
r = [
urwid.Text(("highlight", "Form data:\n")),
]
r.extend(common.format_keyvals(
v,
key = "header",
val = "text"
))
return "Multipart form", r
return "Multipart form", self._format(v)
if pyamf:
@ -263,6 +268,7 @@ if pyamf:
data = input.readObject()
self["data"] = data
def pyamf_class_loader(s):
for i in pyamf.CLASS_LOADERS:
if i != pyamf_class_loader:
@ -271,9 +277,11 @@ if pyamf:
return v
return DummyObject
pyamf.register_class_loader(pyamf_class_loader)
class ViewAMF:
class ViewAMF(View):
name = "AMF"
prompt = ("amf", "f")
content_types = ["application/x-amf"]
@ -300,31 +308,30 @@ if pyamf:
else:
return b
def __call__(self, hdrs, content, limit):
envelope = remoting.decode(content, strict=False)
if not envelope:
return None
txt = []
def _format(self, envelope, limit):
for target, message in iter(envelope):
if isinstance(message, pyamf.remoting.Request):
txt.append(urwid.Text([
yield [
("header", "Request: "),
("text", str(target)),
]))
]
else:
txt.append(urwid.Text([
yield [
("header", "Response: "),
("text", "%s, code %s" % (target, message.status)),
]))
]
s = json.dumps(self.unpack(message), indent=4)
txt.extend(_view_text(s[:limit], len(s), limit))
for msg in format_text(s, limit):
yield msg
return "AMF v%s" % envelope.amfVersion, txt
def __call__(self, hdrs, content, limit):
envelope = remoting.decode(content, strict=False)
if envelope:
return "AMF v%s" % envelope.amfVersion, self._format(envelope, limit)
class ViewJavaScript:
class ViewJavaScript(View):
name = "JavaScript"
prompt = ("javascript", "j")
content_types = [
@ -337,10 +344,11 @@ class ViewJavaScript:
opts = jsbeautifier.default_options()
opts.indent_size = 2
res = jsbeautifier.beautify(content[:limit], opts)
return "JavaScript", _view_text(res, len(res), limit)
cutoff = max(0, len(content) - limit)
return "JavaScript", format_text(res, limit - cutoff)
class ViewCSS:
class ViewCSS(View):
name = "CSS"
prompt = ("css", "c")
content_types = [
@ -354,10 +362,10 @@ class ViewCSS:
else:
beautified = content
return "CSS", _view_text(beautified, len(beautified), limit)
return "CSS", format_text(beautified, limit)
class ViewImage:
class ViewImage(View):
name = "Image"
prompt = ("image", "i")
content_types = [
@ -396,15 +404,11 @@ class ViewImage:
clean.append(
[netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])]
)
fmt = common.format_keyvals(
clean,
key = "header",
val = "text"
)
fmt = format_dict(ODict(clean))
return "%s image" % img.format, fmt
class ViewProtobuf:
class ViewProtobuf(View):
"""Human friendly view of protocol buffers
The view uses the protoc compiler to decode the binary
"""
@ -443,11 +447,10 @@ class ViewProtobuf:
def __call__(self, hdrs, content, limit):
decoded = self.decode_protobuf(content)
txt = _view_text(decoded[:limit], len(decoded), limit)
return "Protobuf", txt
return "Protobuf", format_text(decoded, limit)
class ViewWBXML:
class ViewWBXML(View):
name = "WBXML"
prompt = ("wbxml", "w")
content_types = [
@ -460,11 +463,12 @@ class ViewWBXML:
try:
parser = ASCommandResponse(content)
parsedContent = parser.xmlString
txt = _view_text(parsedContent, len(parsedContent), limit)
return "WBXML", txt
if parsedContent:
return "WBXML", format_text(parsedContent, limit)
except:
return None
views = [
ViewAuto(),
ViewRaw(),
@ -492,7 +496,6 @@ for i in views:
l = content_types_map.setdefault(ct, [])
l.append(i)
view_prompts = [i.prompt for i in views]
@ -510,13 +513,17 @@ def get(name):
def get_content_view(viewmode, headers, content, limit, is_request):
"""
Returns a (msg, body) tuple.
Returns:
A (description, content generator) tuple.
Raises:
ContentViewException, if the content view threw an error.
"""
if not content:
if is_request:
return "No request content (press tab to view response)", ""
return "No request content (press tab to view response)", []
else:
return "No content", ""
return "No content", []
msg = []
enc = headers.get("content-encoding")
@ -528,11 +535,12 @@ def get_content_view(viewmode, headers, content, limit, is_request):
try:
ret = viewmode(headers, content, limit)
# Third-party viewers can fail in unexpected ways...
except Exception:
s = traceback.format_exc()
s = "Content viewer failed: \n" + s
signals.add_event(s, "error")
ret = None
except Exception as e:
six.reraise(
ContentViewException,
ContentViewException(str(e)),
sys.exc_info()[2]
)
if not ret:
ret = get("Raw")(headers, content, limit)
msg.append("Couldn't parse: falling back to Raw")

View File

@ -38,10 +38,10 @@ class ASCommandResponse:
if ( len(response) > 0):
self.xmlString = self.decodeWBXML(self.wbxmlBody)
else:
logging.error("Empty WBXML body passed")
raise ValueError("Empty WBXML body passed")
except Exception as e:
logging.error("Error: {0}".format(e.message))
self.xmlString = None
raise ValueError("Error: {0}".format(e.message))
def getWBXMLBytes(self):
return self.wbxmlBytes
@ -70,4 +70,3 @@ if __name__ == "__main__":
logging.info("-"*100)
instance = ASCommandResponse(byteWBXML)
logging.info(instance.xmlString)

View File

@ -51,3 +51,7 @@ class InvalidCredentials(HttpException):
class ServerException(ProxyException):
pass
class ContentViewException(ProxyException):
pass

View File

@ -54,7 +54,7 @@ def pretty_json(s):
p = json.loads(s)
except ValueError:
return None
return json.dumps(p, sort_keys=True, indent=4).split("\n")
return json.dumps(p, sort_keys=True, indent=4)
def pretty_duration(secs):

View File

@ -3,10 +3,8 @@ from nose.plugins.skip import SkipTest
if os.name == "nt":
raise SkipTest("Skipped on Windows.")
from netlib import encoding
import libmproxy.console.common as common
from libmproxy import utils, flow
import tutils

View File

@ -1,16 +1,12 @@
import os
from nose.plugins.skip import SkipTest
from libmproxy.exceptions import ContentViewException
from netlib.http import Headers
if os.name == "nt":
raise SkipTest("Skipped on Windows.")
import sys
import netlib.utils
from netlib import encoding
import libmproxy.console.contentview as cv
from libmproxy import utils, flow
import libmproxy.contentview as cv
import tutils
try:
@ -26,11 +22,11 @@ except:
class TestContentView:
def test_trailer(self):
txt = []
cv.trailer(5, txt, 1000)
assert not txt
cv.trailer(cv.VIEW_CUTOFF + 10, txt, cv.VIEW_CUTOFF)
assert txt
txt = "X"*10
lines = cv.trailer(txt, 1000)
assert not list(lines)
lines = cv.trailer(txt, 5)
assert list(lines)
def test_view_auto(self):
v = cv.ViewAuto()
@ -124,16 +120,16 @@ class TestContentView:
result = v([], 'a', 100)
if cssutils:
assert len(result[1]) == 0
assert len(list(result[1])) == 0
else:
assert len(result[1]) == 1
assert len(list(result[1])) == 1
result = v([], fixture_1, 100)
if cssutils:
assert len(result[1]) > 1
assert len(list(result[1])) > 1
else:
assert len(result[1]) == 1
assert len(list(result[1])) == 1
def test_view_hex(self):
v = cv.ViewHex()
@ -204,14 +200,15 @@ Larry
)
assert "Raw" in r[0]
r = cv.get_content_view(
tutils.raises(
ContentViewException,
cv.get_content_view,
cv.get("AMF"),
Headers(),
"[1, 2",
1000,
False
)
assert "Raw" in r[0]
r = cv.get_content_view(
cv.get("Auto"),