diff --git a/libmproxy/console/contentview.py b/libmproxy/console/contentview.py index 446eb350d..c6e43a27f 100644 --- a/libmproxy/console/contentview.py +++ b/libmproxy/console/contentview.py @@ -1,6 +1,6 @@ import urwid import common -from .. import utils, encoding +from .. import utils, encoding, flow VIEW_CUTOFF = 1024*100 @@ -20,19 +20,18 @@ VIEW_CONTENT_PRETTY_TYPE_XML = 2 VIEW_CONTENT_PRETTY_TYPE_URLENCODED = 3 CONTENT_PRETTY_NAMES = { - VIEW_CONTENT_PRETTY_TYPE_JSON: "json", - VIEW_CONTENT_PRETTY_TYPE_XML: "xmlish", - VIEW_CONTENT_PRETTY_TYPE_URLENCODED: "urlencoded" + VIEW_CONTENT_PRETTY_TYPE_JSON: "JSON", + VIEW_CONTENT_PRETTY_TYPE_XML: "XML", + VIEW_CONTENT_PRETTY_TYPE_URLENCODED: "URL-encoded" } -CONTENT_PRETTY_TYPES = { +CONTENT_TYPES_MAP = { "text/html": VIEW_CONTENT_PRETTY_TYPE_XML, "application/json": VIEW_CONTENT_PRETTY_TYPE_JSON, "text/xml": VIEW_CONTENT_PRETTY_TYPE_XML, "multipart/form-data": VIEW_CONTENT_PRETTY_TYPE_URLENCODED } - def trailer(clen, txt): rem = clen - VIEW_CUTOFF if rem > 0: @@ -45,16 +44,18 @@ def trailer(clen, txt): ) ) -def view_flow_raw(content): + +def view_raw(hdrs, content): txt = [] for i in utils.cleanBin(content[:VIEW_CUTOFF]).splitlines(): txt.append( urwid.Text(("text", i)) ) trailer(len(content), txt) - return txt + return "Raw", txt -def view_flow_binary(content): + +def view_hex(hdrs, content): txt = [] for offset, hexa, s in utils.hexdump(content[:VIEW_CUTOFF]): txt.append(urwid.Text([ @@ -65,31 +66,37 @@ def view_flow_binary(content): ("text", s), ])) trailer(len(content), txt) - return txt + return "HEX", txt -def view_flow_xmlish(content): + +def view_xmlish(hdrs, content): txt = [] for i in utils.pretty_xmlish(content[:VIEW_CUTOFF]): txt.append( urwid.Text(("text", i)), ) trailer(len(content), txt) - return txt + return "XML-like data", txt -def view_flow_json(lines): - txt = [] - sofar = 0 - for i in lines: - sofar += len(i) - txt.append( - urwid.Text(("text", i)), - ) - if sofar > VIEW_CUTOFF: - break - trailer(sum(len(i) for i in lines), txt) - return txt -def view_flow_formdata(content, boundary): +def view_json(hdrs, content): + lines = utils.pretty_json(content) + if lines: + txt = [] + sofar = 0 + for i in lines: + sofar += len(i) + txt.append( + urwid.Text(("text", i)), + ) + if sofar > VIEW_CUTOFF: + break + trailer(sum(len(i) for i in lines), txt) + return "JSON", txt + + +# FIXME +def view_formdata(content, boundary): rx = re.compile(r'\bname="([^"]+)"') keys = [] vals = [] @@ -113,65 +120,66 @@ def view_flow_formdata(content, boundary): )) return r -def view_flow_urlencoded(lines): - return common.format_keyvals( - [(k+":", v) for (k, v) in lines], - key = "header", - val = "text" - ) -def find_pretty_view(content, hdrItems, pretty_type=VIEW_CONTENT_PRETTY_TYPE_AUTO): - ctype = None - if pretty_type == VIEW_CONTENT_PRETTY_TYPE_AUTO: - pretty_type == None - for i in hdrItems: - if i[0].lower() == "content-type": - ctype = i[1] - break - ct = utils.parse_content_type(ctype) if ctype else None - if ct: - pretty_type = CONTENT_PRETTY_TYPES.get("%s/%s"%(ct[0], ct[1])) - if not pretty_type and utils.isXML(content): - pretty_type = VIEW_CONTENT_PRETTY_TYPE_XML - - if pretty_type == VIEW_CONTENT_PRETTY_TYPE_URLENCODED: - data = utils.urldecode(content) - if data: - return "URLEncoded form", view_flow_urlencoded(data) - - if pretty_type == VIEW_CONTENT_PRETTY_TYPE_XML: - return "Indented XML-ish", view_flow_xmlish(content) - - if pretty_type == VIEW_CONTENT_PRETTY_TYPE_JSON: - lines = utils.pretty_json(content) - if lines: - return "JSON", view_flow_json(lines) - - return "Falling back to raw.", view_flow_raw(content) +def view_urlencoded(hdrs, content): + lines = utils.urldecode(content) + if lines: + body = common.format_keyvals( + [(k+":", v) for (k, v) in lines], + key = "header", + val = "text" + ) + return "URLEncoded form", body -def get_content_view(viewmode, pretty_type, enc, content, hdrItems): +PRETTY_FUNCTION_MAP = { + VIEW_CONTENT_PRETTY_TYPE_XML: view_xmlish, + VIEW_CONTENT_PRETTY_TYPE_JSON: view_json, + VIEW_CONTENT_PRETTY_TYPE_URLENCODED: view_urlencoded +} + +def get_view_func(viewmode, pretty_type, hdrs, content): + """ + Returns a function object. + """ + if viewmode == VIEW_CONTENT_HEX: + return view_hex + elif viewmode == VIEW_CONTENT_RAW: + return view_raw + else: + if pretty_type == VIEW_CONTENT_PRETTY_TYPE_AUTO: + ctype = hdrs.get("content-type") + if ctype: + ctype = ctype[0] + ct = utils.parse_content_type(ctype) if ctype else None + if ct: + pretty_type = CONTENT_TYPES_MAP.get("%s/%s"%(ct[0], ct[1])) + if not pretty_type and utils.isXML(content): + pretty_type = VIEW_CONTENT_PRETTY_TYPE_XML + return PRETTY_FUNCTION_MAP.get(pretty_type, view_raw) + + +def get_content_view(viewmode, pretty_type, hdrItems, content): """ Returns a (msg, body) tuple. """ - msg = "" - if viewmode == VIEW_CONTENT_HEX: - body = view_flow_binary(content) - elif viewmode == VIEW_CONTENT_PRETTY: - emsg = "" - if enc: - decoded = encoding.decode(enc, content) - if decoded: - content = decoded - if enc and enc != "identity": - emsg = "[decoded %s]"%enc - msg, body = find_pretty_view(content, hdrItems, pretty_type) - if pretty_type != VIEW_CONTENT_PRETTY_TYPE_AUTO: - emsg += " (forced to %s)"%(CONTENT_PRETTY_NAMES[pretty_type]) - if emsg: - msg = emsg + " " + msg - else: - body = view_flow_raw(content) - return msg, body + msg = [] + hdrs = flow.ODictCaseless([list(i) for i in hdrItems]) + enc = hdrs.get("content-encoding") + if enc and enc[0] != "identity": + decoded = encoding.decode(enc[0], content) + if decoded: + content = decoded + msg.append("[decoded %s]"%enc[0]) + + if viewmode == VIEW_CONTENT_PRETTY and pretty_type != VIEW_CONTENT_PRETTY_TYPE_AUTO: + msg.append("[forced to %s]"%(CONTENT_PRETTY_NAMES[pretty_type])) + func = get_view_func(viewmode, pretty_type, hdrs, content) + + ret = func(hdrs, content) + if not ret: + ret = view_raw(hdrs, content) + msg.append(ret[0]) + return " ".join(msg), ret[1] diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py index 95fc6d67f..3a6f02d57 100644 --- a/libmproxy/console/flowview.py +++ b/libmproxy/console/flowview.py @@ -116,15 +116,14 @@ class FlowView(common.WWrap): else: self.view_request() - def _cached_conn_text(self, e, content, hdrItems, viewmode, pretty_type): + def _cached_conn_text(self, content, hdrItems, viewmode, pretty_type): txt = common.format_keyvals( [(h+":", v) for (h, v) in hdrItems], key = "header", val = "text" ) - if content: - msg, body = contentview.get_content_view(viewmode, pretty_type, e, content, hdrItems) + msg, body = contentview.get_content_view(viewmode, pretty_type, hdrItems, content) title = urwid.AttrWrap(urwid.Columns([ urwid.Text( [ @@ -180,11 +179,8 @@ class FlowView(common.WWrap): return f def _conn_text(self, conn, viewmode, pretty_type): - e = conn.headers["content-encoding"] - e = e[0] if e else None return cache.callback( self, "_cached_conn_text", - e, conn.content, tuple(tuple(i) for i in conn.headers.lst), viewmode, diff --git a/libmproxy/flow.py b/libmproxy/flow.py index 5ff12b5bf..bb079a8d1 100644 --- a/libmproxy/flow.py +++ b/libmproxy/flow.py @@ -181,6 +181,12 @@ class ODict: def add(self, key, value): self.lst.append([key, str(value)]) + def get(self, k, d=None): + if k in self: + return self[k] + else: + return d + def _get_state(self): return [tuple(i) for i in self.lst] diff --git a/test/test_console_contentview.py b/test/test_console_contentview.py new file mode 100644 index 000000000..070094ffd --- /dev/null +++ b/test/test_console_contentview.py @@ -0,0 +1,135 @@ +import libpry +import libmproxy.console.contentview as cv +from libmproxy import utils, flow, encoding + +class uContentView(libpry.AutoTree): + def test_trailer(self): + txt = [] + cv.trailer(5, txt) + assert not txt + cv.trailer(cv.VIEW_CUTOFF + 10, txt) + assert txt + + def test_get_view_func(self): + f = cv.get_view_func( + cv.VIEW_CONTENT_HEX, + cv.VIEW_CONTENT_PRETTY_TYPE_AUTO, + flow.ODictCaseless(), + "foo" + ) + assert f is cv.view_hex + + f = cv.get_view_func( + cv.VIEW_CONTENT_RAW, + cv.VIEW_CONTENT_PRETTY_TYPE_AUTO, + flow.ODictCaseless(), + "foo" + ) + assert f is cv.view_raw + + f = cv.get_view_func( + cv.VIEW_CONTENT_PRETTY, + cv.VIEW_CONTENT_PRETTY_TYPE_AUTO, + flow.ODictCaseless( + [["content-type", "text/html"]], + ), + "foo" + ) + assert f is cv.view_xmlish + + f = cv.get_view_func( + cv.VIEW_CONTENT_PRETTY, + cv.VIEW_CONTENT_PRETTY_TYPE_AUTO, + flow.ODictCaseless( + [["content-type", "text/flibble"]], + ), + "foo" + ) + assert f is cv.view_raw + + f = cv.get_view_func( + cv.VIEW_CONTENT_PRETTY, + cv.VIEW_CONTENT_PRETTY_TYPE_AUTO, + flow.ODictCaseless( + [["content-type", "text/flibble"]], + ), + "" + ) + assert f is cv.view_xmlish + + def test_view_urlencoded(self): + d = utils.urlencode([("one", "two"), ("three", "four")]) + assert cv.view_urlencoded([], d) + assert not cv.view_urlencoded([], "foo") + + def test_view_json(self): + cv.VIEW_CUTOFF = 100 + assert cv.view_json([], "{}") + assert not cv.view_urlencoded([], "{") + assert cv.view_json([], "[" + ",".join(["0"]*cv.VIEW_CUTOFF) + "]") + + def test_view_xmlish(self): + assert cv.view_xmlish([], "") + assert cv.view_xmlish([], "") + + def test_view_raw(self): + assert cv.view_raw([], "foo") + + def test_view_raw(self): + assert cv.view_hex([], "foo") + + def test_get_content_view(self): + r = cv.get_content_view( + cv.VIEW_CONTENT_RAW, + cv.VIEW_CONTENT_PRETTY_TYPE_AUTO, + [["content-type", "application/json"]], + "[1, 2, 3]" + ) + assert r[0] == "Raw" + + r = cv.get_content_view( + cv.VIEW_CONTENT_PRETTY, + cv.VIEW_CONTENT_PRETTY_TYPE_AUTO, + [["content-type", "application/json"]], + "[1, 2, 3]" + ) + assert r[0] == "JSON" + + + r = cv.get_content_view( + cv.VIEW_CONTENT_PRETTY, + cv.VIEW_CONTENT_PRETTY_TYPE_AUTO, + [["content-type", "application/json"]], + "[1, 2" + ) + assert r[0] == "Raw" + + r = cv.get_content_view( + cv.VIEW_CONTENT_PRETTY, + cv.VIEW_CONTENT_PRETTY_TYPE_AUTO, + [ + ["content-type", "application/json"], + ["content-encoding", "gzip"] + ], + encoding.encode('gzip', "[1, 2, 3]") + ) + assert "decoded gzip" in r[0] + assert "JSON" in r[0] + + + r = cv.get_content_view( + cv.VIEW_CONTENT_PRETTY, + cv.VIEW_CONTENT_PRETTY_TYPE_XML, + [ + ["content-type", "application/json"], + ["content-encoding", "gzip"] + ], + encoding.encode('gzip', "[1, 2, 3]") + ) + assert "decoded gzip" in r[0] + assert "forced" in r[0] + + +tests = [ + uContentView() +] diff --git a/test/test_flow.py b/test/test_flow.py index 8a7da05cf..e44e2b0df 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -1001,6 +1001,11 @@ class uODict(libpry.AutoTree): ["two", "vun"], ] + def test_get(self): + self.od.add("one", "two") + assert self.od.get("one") == ["two"] + assert self.od.get("two") == None + class uODictCaseless(libpry.AutoTree): def setUp(self):