Refactor pretty view mechanism.

Also start adding unit tests for this subsystem.
This commit is contained in:
Aldo Cortesi 2012-03-24 14:02:41 +13:00
parent 0d05068f91
commit 62e51018d0
5 changed files with 235 additions and 85 deletions

View File

@ -1,6 +1,6 @@
import urwid
import common
from .. import utils, encoding
from .. import utils, encoding, flow
VIEW_CUTOFF = 1024*100
@ -20,19 +20,18 @@ VIEW_CONTENT_PRETTY_TYPE_XML = 2
VIEW_CONTENT_PRETTY_TYPE_URLENCODED = 3
CONTENT_PRETTY_NAMES = {
VIEW_CONTENT_PRETTY_TYPE_JSON: "json",
VIEW_CONTENT_PRETTY_TYPE_XML: "xmlish",
VIEW_CONTENT_PRETTY_TYPE_URLENCODED: "urlencoded"
VIEW_CONTENT_PRETTY_TYPE_JSON: "JSON",
VIEW_CONTENT_PRETTY_TYPE_XML: "XML",
VIEW_CONTENT_PRETTY_TYPE_URLENCODED: "URL-encoded"
}
CONTENT_PRETTY_TYPES = {
CONTENT_TYPES_MAP = {
"text/html": VIEW_CONTENT_PRETTY_TYPE_XML,
"application/json": VIEW_CONTENT_PRETTY_TYPE_JSON,
"text/xml": VIEW_CONTENT_PRETTY_TYPE_XML,
"multipart/form-data": VIEW_CONTENT_PRETTY_TYPE_URLENCODED
}
def trailer(clen, txt):
rem = clen - VIEW_CUTOFF
if rem > 0:
@ -45,16 +44,18 @@ def trailer(clen, txt):
)
)
def view_flow_raw(content):
def view_raw(hdrs, content):
txt = []
for i in utils.cleanBin(content[:VIEW_CUTOFF]).splitlines():
txt.append(
urwid.Text(("text", i))
)
trailer(len(content), txt)
return txt
return "Raw", txt
def view_flow_binary(content):
def view_hex(hdrs, content):
txt = []
for offset, hexa, s in utils.hexdump(content[:VIEW_CUTOFF]):
txt.append(urwid.Text([
@ -65,31 +66,37 @@ def view_flow_binary(content):
("text", s),
]))
trailer(len(content), txt)
return txt
return "HEX", txt
def view_flow_xmlish(content):
def view_xmlish(hdrs, content):
txt = []
for i in utils.pretty_xmlish(content[:VIEW_CUTOFF]):
txt.append(
urwid.Text(("text", i)),
)
trailer(len(content), txt)
return txt
return "XML-like data", txt
def view_flow_json(lines):
txt = []
sofar = 0
for i in lines:
sofar += len(i)
txt.append(
urwid.Text(("text", i)),
)
if sofar > VIEW_CUTOFF:
break
trailer(sum(len(i) for i in lines), txt)
return txt
def view_flow_formdata(content, boundary):
def view_json(hdrs, content):
lines = utils.pretty_json(content)
if lines:
txt = []
sofar = 0
for i in lines:
sofar += len(i)
txt.append(
urwid.Text(("text", i)),
)
if sofar > VIEW_CUTOFF:
break
trailer(sum(len(i) for i in lines), txt)
return "JSON", txt
# FIXME
def view_formdata(content, boundary):
rx = re.compile(r'\bname="([^"]+)"')
keys = []
vals = []
@ -113,65 +120,66 @@ def view_flow_formdata(content, boundary):
))
return r
def view_flow_urlencoded(lines):
return common.format_keyvals(
[(k+":", v) for (k, v) in lines],
key = "header",
val = "text"
)
def find_pretty_view(content, hdrItems, pretty_type=VIEW_CONTENT_PRETTY_TYPE_AUTO):
ctype = None
if pretty_type == VIEW_CONTENT_PRETTY_TYPE_AUTO:
pretty_type == None
for i in hdrItems:
if i[0].lower() == "content-type":
ctype = i[1]
break
ct = utils.parse_content_type(ctype) if ctype else None
if ct:
pretty_type = CONTENT_PRETTY_TYPES.get("%s/%s"%(ct[0], ct[1]))
if not pretty_type and utils.isXML(content):
pretty_type = VIEW_CONTENT_PRETTY_TYPE_XML
if pretty_type == VIEW_CONTENT_PRETTY_TYPE_URLENCODED:
data = utils.urldecode(content)
if data:
return "URLEncoded form", view_flow_urlencoded(data)
if pretty_type == VIEW_CONTENT_PRETTY_TYPE_XML:
return "Indented XML-ish", view_flow_xmlish(content)
if pretty_type == VIEW_CONTENT_PRETTY_TYPE_JSON:
lines = utils.pretty_json(content)
if lines:
return "JSON", view_flow_json(lines)
return "Falling back to raw.", view_flow_raw(content)
def view_urlencoded(hdrs, content):
lines = utils.urldecode(content)
if lines:
body = common.format_keyvals(
[(k+":", v) for (k, v) in lines],
key = "header",
val = "text"
)
return "URLEncoded form", body
def get_content_view(viewmode, pretty_type, enc, content, hdrItems):
PRETTY_FUNCTION_MAP = {
VIEW_CONTENT_PRETTY_TYPE_XML: view_xmlish,
VIEW_CONTENT_PRETTY_TYPE_JSON: view_json,
VIEW_CONTENT_PRETTY_TYPE_URLENCODED: view_urlencoded
}
def get_view_func(viewmode, pretty_type, hdrs, content):
"""
Returns a function object.
"""
if viewmode == VIEW_CONTENT_HEX:
return view_hex
elif viewmode == VIEW_CONTENT_RAW:
return view_raw
else:
if pretty_type == VIEW_CONTENT_PRETTY_TYPE_AUTO:
ctype = hdrs.get("content-type")
if ctype:
ctype = ctype[0]
ct = utils.parse_content_type(ctype) if ctype else None
if ct:
pretty_type = CONTENT_TYPES_MAP.get("%s/%s"%(ct[0], ct[1]))
if not pretty_type and utils.isXML(content):
pretty_type = VIEW_CONTENT_PRETTY_TYPE_XML
return PRETTY_FUNCTION_MAP.get(pretty_type, view_raw)
def get_content_view(viewmode, pretty_type, hdrItems, content):
"""
Returns a (msg, body) tuple.
"""
msg = ""
if viewmode == VIEW_CONTENT_HEX:
body = view_flow_binary(content)
elif viewmode == VIEW_CONTENT_PRETTY:
emsg = ""
if enc:
decoded = encoding.decode(enc, content)
if decoded:
content = decoded
if enc and enc != "identity":
emsg = "[decoded %s]"%enc
msg, body = find_pretty_view(content, hdrItems, pretty_type)
if pretty_type != VIEW_CONTENT_PRETTY_TYPE_AUTO:
emsg += " (forced to %s)"%(CONTENT_PRETTY_NAMES[pretty_type])
if emsg:
msg = emsg + " " + msg
else:
body = view_flow_raw(content)
return msg, body
msg = []
hdrs = flow.ODictCaseless([list(i) for i in hdrItems])
enc = hdrs.get("content-encoding")
if enc and enc[0] != "identity":
decoded = encoding.decode(enc[0], content)
if decoded:
content = decoded
msg.append("[decoded %s]"%enc[0])
if viewmode == VIEW_CONTENT_PRETTY and pretty_type != VIEW_CONTENT_PRETTY_TYPE_AUTO:
msg.append("[forced to %s]"%(CONTENT_PRETTY_NAMES[pretty_type]))
func = get_view_func(viewmode, pretty_type, hdrs, content)
ret = func(hdrs, content)
if not ret:
ret = view_raw(hdrs, content)
msg.append(ret[0])
return " ".join(msg), ret[1]

View File

@ -116,15 +116,14 @@ class FlowView(common.WWrap):
else:
self.view_request()
def _cached_conn_text(self, e, content, hdrItems, viewmode, pretty_type):
def _cached_conn_text(self, content, hdrItems, viewmode, pretty_type):
txt = common.format_keyvals(
[(h+":", v) for (h, v) in hdrItems],
key = "header",
val = "text"
)
if content:
msg, body = contentview.get_content_view(viewmode, pretty_type, e, content, hdrItems)
msg, body = contentview.get_content_view(viewmode, pretty_type, hdrItems, content)
title = urwid.AttrWrap(urwid.Columns([
urwid.Text(
[
@ -180,11 +179,8 @@ class FlowView(common.WWrap):
return f
def _conn_text(self, conn, viewmode, pretty_type):
e = conn.headers["content-encoding"]
e = e[0] if e else None
return cache.callback(
self, "_cached_conn_text",
e,
conn.content,
tuple(tuple(i) for i in conn.headers.lst),
viewmode,

View File

@ -181,6 +181,12 @@ class ODict:
def add(self, key, value):
self.lst.append([key, str(value)])
def get(self, k, d=None):
if k in self:
return self[k]
else:
return d
def _get_state(self):
return [tuple(i) for i in self.lst]

View File

@ -0,0 +1,135 @@
import libpry
import libmproxy.console.contentview as cv
from libmproxy import utils, flow, encoding
class uContentView(libpry.AutoTree):
def test_trailer(self):
txt = []
cv.trailer(5, txt)
assert not txt
cv.trailer(cv.VIEW_CUTOFF + 10, txt)
assert txt
def test_get_view_func(self):
f = cv.get_view_func(
cv.VIEW_CONTENT_HEX,
cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
flow.ODictCaseless(),
"foo"
)
assert f is cv.view_hex
f = cv.get_view_func(
cv.VIEW_CONTENT_RAW,
cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
flow.ODictCaseless(),
"foo"
)
assert f is cv.view_raw
f = cv.get_view_func(
cv.VIEW_CONTENT_PRETTY,
cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
flow.ODictCaseless(
[["content-type", "text/html"]],
),
"foo"
)
assert f is cv.view_xmlish
f = cv.get_view_func(
cv.VIEW_CONTENT_PRETTY,
cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
flow.ODictCaseless(
[["content-type", "text/flibble"]],
),
"foo"
)
assert f is cv.view_raw
f = cv.get_view_func(
cv.VIEW_CONTENT_PRETTY,
cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
flow.ODictCaseless(
[["content-type", "text/flibble"]],
),
"<xml></xml>"
)
assert f is cv.view_xmlish
def test_view_urlencoded(self):
d = utils.urlencode([("one", "two"), ("three", "four")])
assert cv.view_urlencoded([], d)
assert not cv.view_urlencoded([], "foo")
def test_view_json(self):
cv.VIEW_CUTOFF = 100
assert cv.view_json([], "{}")
assert not cv.view_urlencoded([], "{")
assert cv.view_json([], "[" + ",".join(["0"]*cv.VIEW_CUTOFF) + "]")
def test_view_xmlish(self):
assert cv.view_xmlish([], "<foo></foo>")
assert cv.view_xmlish([], "<foo>")
def test_view_raw(self):
assert cv.view_raw([], "foo")
def test_view_raw(self):
assert cv.view_hex([], "foo")
def test_get_content_view(self):
r = cv.get_content_view(
cv.VIEW_CONTENT_RAW,
cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
[["content-type", "application/json"]],
"[1, 2, 3]"
)
assert r[0] == "Raw"
r = cv.get_content_view(
cv.VIEW_CONTENT_PRETTY,
cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
[["content-type", "application/json"]],
"[1, 2, 3]"
)
assert r[0] == "JSON"
r = cv.get_content_view(
cv.VIEW_CONTENT_PRETTY,
cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
[["content-type", "application/json"]],
"[1, 2"
)
assert r[0] == "Raw"
r = cv.get_content_view(
cv.VIEW_CONTENT_PRETTY,
cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
[
["content-type", "application/json"],
["content-encoding", "gzip"]
],
encoding.encode('gzip', "[1, 2, 3]")
)
assert "decoded gzip" in r[0]
assert "JSON" in r[0]
r = cv.get_content_view(
cv.VIEW_CONTENT_PRETTY,
cv.VIEW_CONTENT_PRETTY_TYPE_XML,
[
["content-type", "application/json"],
["content-encoding", "gzip"]
],
encoding.encode('gzip', "[1, 2, 3]")
)
assert "decoded gzip" in r[0]
assert "forced" in r[0]
tests = [
uContentView()
]

View File

@ -1001,6 +1001,11 @@ class uODict(libpry.AutoTree):
["two", "vun"],
]
def test_get(self):
self.od.add("one", "two")
assert self.od.get("one") == ["two"]
assert self.od.get("two") == None
class uODictCaseless(libpry.AutoTree):
def setUp(self):