From ec34cae6181d6af0150ac730d70b96104a07e9d5 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Tue, 31 May 2016 19:07:55 +1200 Subject: [PATCH] utils.multipartdecode -> http.multipart.decode also utils.parse_content_type -> http.headers.parse_content_type --- mitmproxy/contentviews.py | 9 ++--- mitmproxy/flow/export.py | 4 +-- netlib/http/headers.py | 27 ++++++++++++++ netlib/http/multipart.py | 32 +++++++++++++++++ netlib/http/request.py | 3 +- netlib/utils.py | 56 ------------------------------ test/netlib/http/test_headers.py | 10 ++++++ test/netlib/http/test_multipart.py | 23 ++++++++++++ test/netlib/test_utils.py | 32 ----------------- 9 files changed, 101 insertions(+), 95 deletions(-) create mode 100644 netlib/http/multipart.py create mode 100644 test/netlib/http/test_multipart.py diff --git a/mitmproxy/contentviews.py b/mitmproxy/contentviews.py index 75e4273fd..08a7e446f 100644 --- a/mitmproxy/contentviews.py +++ b/mitmproxy/contentviews.py @@ -27,8 +27,9 @@ import html2text import six from netlib.odict import ODict from netlib import encoding -from netlib.http import url -from netlib.utils import clean_bin, hexdump, multipartdecode, parse_content_type +import netlib.http.headers +from netlib.http import url, multipart +from netlib.utils import clean_bin, hexdump from . import utils from .exceptions import ContentViewException from .contrib import jsbeautifier @@ -121,7 +122,7 @@ class ViewAuto(View): headers = metadata.get("headers", {}) ctype = headers.get("content-type") if data and ctype: - ct = parse_content_type(ctype) if ctype else None + ct = netlib.http.headers.parse_content_type(ctype) if ctype else None ct = "%s/%s" % (ct[0], ct[1]) if ct in content_types_map: return content_types_map[ct][0](data, **metadata) @@ -275,7 +276,7 @@ class ViewMultipart(View): def __call__(self, data, **metadata): headers = metadata.get("headers", {}) - v = multipartdecode(headers, data) + v = multipart.decode(headers, data) if v: return "Multipart form", self._format(v) diff --git a/mitmproxy/flow/export.py b/mitmproxy/flow/export.py index d2c7bceb5..c2f54554d 100644 --- a/mitmproxy/flow/export.py +++ b/mitmproxy/flow/export.py @@ -5,7 +5,7 @@ from textwrap import dedent from six.moves.urllib.parse import quote, quote_plus import netlib.http -from netlib.utils import parse_content_type +import netlib.http.headers def curl_command(flow): @@ -88,7 +88,7 @@ def raw_request(flow): def is_json(headers, content): if headers: - ct = parse_content_type(headers.get("content-type", "")) + ct = netlib.http.headers.parse_content_type(headers.get("content-type", "")) if ct and "%s/%s" % (ct[0], ct[1]) == "application/json": try: return json.loads(content) diff --git a/netlib/http/headers.py b/netlib/http/headers.py index 6165fd616..8f669ec17 100644 --- a/netlib/http/headers.py +++ b/netlib/http/headers.py @@ -175,3 +175,30 @@ class Headers(MultiDict): fields.append([name, value]) self.fields = fields return replacements + + +def parse_content_type(c): + """ + A simple parser for content-type values. Returns a (type, subtype, + parameters) tuple, where type and subtype are strings, and parameters + is a dict. If the string could not be parsed, return None. + + E.g. the following string: + + text/html; charset=UTF-8 + + Returns: + + ("text", "html", {"charset": "UTF-8"}) + """ + parts = c.split(";", 1) + ts = parts[0].split("/", 1) + if len(ts) != 2: + return None + d = {} + if len(parts) == 2: + for i in parts[1].split(";"): + clause = i.split("=", 1) + if len(clause) == 2: + d[clause[0].strip()] = clause[1].strip() + return ts[0].lower(), ts[1].lower(), d diff --git a/netlib/http/multipart.py b/netlib/http/multipart.py new file mode 100644 index 000000000..a135eb863 --- /dev/null +++ b/netlib/http/multipart.py @@ -0,0 +1,32 @@ +import re + +from . import headers + + +def decode(hdrs, content): + """ + Takes a multipart boundary encoded string and returns list of (key, value) tuples. + """ + v = hdrs.get("content-type") + if v: + v = headers.parse_content_type(v) + if not v: + return [] + try: + boundary = v[2]["boundary"].encode("ascii") + except (KeyError, UnicodeError): + return [] + + rx = re.compile(br'\bname="([^"]+)"') + r = [] + + for i in content.split(b"--" + boundary): + parts = i.splitlines() + if len(parts) > 1 and parts[0][0:2] != b"--": + match = rx.search(parts[1]) + if match: + key = match.group(1) + value = b"".join(parts[3 + parts[2:].index(b""):]) + r.append((key, value)) + return r + return [] diff --git a/netlib/http/request.py b/netlib/http/request.py index d552bc70b..2fcea67dc 100644 --- a/netlib/http/request.py +++ b/netlib/http/request.py @@ -7,6 +7,7 @@ from six.moves import urllib from netlib import utils import netlib.http.url +from netlib.http import multipart from . import cookies from .. import encoding from ..multidict import MultiDictView @@ -369,7 +370,7 @@ class Request(Message): def _get_multipart_form(self): is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower() if is_valid_content_type: - return utils.multipartdecode(self.headers, self.content) + return multipart.decode(self.headers, self.content) return () def _set_multipart_form(self, value): diff --git a/netlib/utils.py b/netlib/utils.py index a2d8c97d0..a0150e779 100644 --- a/netlib/utils.py +++ b/netlib/utils.py @@ -190,62 +190,6 @@ def hostport(scheme, host, port): return "%s:%d" % (host, port) -def parse_content_type(c): - """ - A simple parser for content-type values. Returns a (type, subtype, - parameters) tuple, where type and subtype are strings, and parameters - is a dict. If the string could not be parsed, return None. - - E.g. the following string: - - text/html; charset=UTF-8 - - Returns: - - ("text", "html", {"charset": "UTF-8"}) - """ - parts = c.split(";", 1) - ts = parts[0].split("/", 1) - if len(ts) != 2: - return None - d = {} - if len(parts) == 2: - for i in parts[1].split(";"): - clause = i.split("=", 1) - if len(clause) == 2: - d[clause[0].strip()] = clause[1].strip() - return ts[0].lower(), ts[1].lower(), d - - -def multipartdecode(headers, content): - """ - Takes a multipart boundary encoded string and returns list of (key, value) tuples. - """ - v = headers.get("content-type") - if v: - v = parse_content_type(v) - if not v: - return [] - try: - boundary = v[2]["boundary"].encode("ascii") - except (KeyError, UnicodeError): - return [] - - rx = re.compile(br'\bname="([^"]+)"') - r = [] - - for i in content.split(b"--" + boundary): - parts = i.splitlines() - if len(parts) > 1 and parts[0][0:2] != b"--": - match = rx.search(parts[1]) - if match: - key = match.group(1) - value = b"".join(parts[3 + parts[2:].index(b""):]) - r.append((key, value)) - return r - return [] - - def safe_subn(pattern, repl, target, *args, **kwargs): """ There are Unicode conversion problems with re.subn. We try to smooth diff --git a/test/netlib/http/test_headers.py b/test/netlib/http/test_headers.py index cd2ca9d11..e12bceaf2 100644 --- a/test/netlib/http/test_headers.py +++ b/test/netlib/http/test_headers.py @@ -1,4 +1,5 @@ from netlib.http import Headers +from netlib.http.headers import parse_content_type from netlib.tutils import raises @@ -72,3 +73,12 @@ class TestHeaders(object): replacements = headers.replace(r"Host: ", "X-Host ") assert replacements == 0 assert headers["Host"] == "example.com" + + +def test_parse_content_type(): + p = parse_content_type + assert p("text/html") == ("text", "html", {}) + assert p("text") is None + + v = p("text/html; charset=UTF-8") + assert v == ('text', 'html', {'charset': 'UTF-8'}) diff --git a/test/netlib/http/test_multipart.py b/test/netlib/http/test_multipart.py new file mode 100644 index 000000000..45ae996b6 --- /dev/null +++ b/test/netlib/http/test_multipart.py @@ -0,0 +1,23 @@ +from netlib.http import Headers +from netlib.http import multipart + +def test_decode(): + boundary = 'somefancyboundary' + headers = Headers( + content_type='multipart/form-data; boundary=' + boundary + ) + content = ( + "--{0}\n" + "Content-Disposition: form-data; name=\"field1\"\n\n" + "value1\n" + "--{0}\n" + "Content-Disposition: form-data; name=\"field2\"\n\n" + "value2\n" + "--{0}--".format(boundary).encode() + ) + + form = multipart.decode(headers, content) + + assert len(form) == 2 + assert form[0] == (b"field1", b"value1") + assert form[1] == (b"field2", b"value2") diff --git a/test/netlib/test_utils.py b/test/netlib/test_utils.py index c4ee3c108..b3cc9a0b5 100644 --- a/test/netlib/test_utils.py +++ b/test/netlib/test_utils.py @@ -1,7 +1,6 @@ # coding=utf-8 from netlib import utils, tutils -from netlib.http import Headers def test_bidi(): @@ -38,37 +37,6 @@ def test_pretty_size(): assert utils.pretty_size(1024 * 1024) == "1MB" -def test_multipartdecode(): - boundary = 'somefancyboundary' - headers = Headers( - content_type='multipart/form-data; boundary=' + boundary - ) - content = ( - "--{0}\n" - "Content-Disposition: form-data; name=\"field1\"\n\n" - "value1\n" - "--{0}\n" - "Content-Disposition: form-data; name=\"field2\"\n\n" - "value2\n" - "--{0}--".format(boundary).encode() - ) - - form = utils.multipartdecode(headers, content) - - assert len(form) == 2 - assert form[0] == (b"field1", b"value1") - assert form[1] == (b"field2", b"value2") - - -def test_parse_content_type(): - p = utils.parse_content_type - assert p("text/html") == ("text", "html", {}) - assert p("text") is None - - v = p("text/html; charset=UTF-8") - assert v == ('text', 'html', {'charset': 'UTF-8'}) - - def test_safe_subn(): assert utils.safe_subn("foo", u"bar", "\xc2foo")