utils.multipartdecode -> http.multipart.decode

also

utils.parse_content_type -> http.headers.parse_content_type
This commit is contained in:
Aldo Cortesi 2016-05-31 19:07:55 +12:00
parent 15b2374ef9
commit ec34cae618
9 changed files with 101 additions and 95 deletions

View File

@ -27,8 +27,9 @@ import html2text
import six import six
from netlib.odict import ODict from netlib.odict import ODict
from netlib import encoding from netlib import encoding
from netlib.http import url import netlib.http.headers
from netlib.utils import clean_bin, hexdump, multipartdecode, parse_content_type from netlib.http import url, multipart
from netlib.utils import clean_bin, hexdump
from . import utils from . import utils
from .exceptions import ContentViewException from .exceptions import ContentViewException
from .contrib import jsbeautifier from .contrib import jsbeautifier
@ -121,7 +122,7 @@ class ViewAuto(View):
headers = metadata.get("headers", {}) headers = metadata.get("headers", {})
ctype = headers.get("content-type") ctype = headers.get("content-type")
if data and ctype: if data and ctype:
ct = parse_content_type(ctype) if ctype else None ct = netlib.http.headers.parse_content_type(ctype) if ctype else None
ct = "%s/%s" % (ct[0], ct[1]) ct = "%s/%s" % (ct[0], ct[1])
if ct in content_types_map: if ct in content_types_map:
return content_types_map[ct][0](data, **metadata) return content_types_map[ct][0](data, **metadata)
@ -275,7 +276,7 @@ class ViewMultipart(View):
def __call__(self, data, **metadata): def __call__(self, data, **metadata):
headers = metadata.get("headers", {}) headers = metadata.get("headers", {})
v = multipartdecode(headers, data) v = multipart.decode(headers, data)
if v: if v:
return "Multipart form", self._format(v) return "Multipart form", self._format(v)

View File

@ -5,7 +5,7 @@ from textwrap import dedent
from six.moves.urllib.parse import quote, quote_plus from six.moves.urllib.parse import quote, quote_plus
import netlib.http import netlib.http
from netlib.utils import parse_content_type import netlib.http.headers
def curl_command(flow): def curl_command(flow):
@ -88,7 +88,7 @@ def raw_request(flow):
def is_json(headers, content): def is_json(headers, content):
if headers: if headers:
ct = parse_content_type(headers.get("content-type", "")) ct = netlib.http.headers.parse_content_type(headers.get("content-type", ""))
if ct and "%s/%s" % (ct[0], ct[1]) == "application/json": if ct and "%s/%s" % (ct[0], ct[1]) == "application/json":
try: try:
return json.loads(content) return json.loads(content)

View File

@ -175,3 +175,30 @@ class Headers(MultiDict):
fields.append([name, value]) fields.append([name, value])
self.fields = fields self.fields = fields
return replacements return replacements
def parse_content_type(c):
"""
A simple parser for content-type values. Returns a (type, subtype,
parameters) tuple, where type and subtype are strings, and parameters
is a dict. If the string could not be parsed, return None.
E.g. the following string:
text/html; charset=UTF-8
Returns:
("text", "html", {"charset": "UTF-8"})
"""
parts = c.split(";", 1)
ts = parts[0].split("/", 1)
if len(ts) != 2:
return None
d = {}
if len(parts) == 2:
for i in parts[1].split(";"):
clause = i.split("=", 1)
if len(clause) == 2:
d[clause[0].strip()] = clause[1].strip()
return ts[0].lower(), ts[1].lower(), d

32
netlib/http/multipart.py Normal file
View File

@ -0,0 +1,32 @@
import re
from . import headers
def decode(hdrs, content):
"""
Takes a multipart boundary encoded string and returns list of (key, value) tuples.
"""
v = hdrs.get("content-type")
if v:
v = headers.parse_content_type(v)
if not v:
return []
try:
boundary = v[2]["boundary"].encode("ascii")
except (KeyError, UnicodeError):
return []
rx = re.compile(br'\bname="([^"]+)"')
r = []
for i in content.split(b"--" + boundary):
parts = i.splitlines()
if len(parts) > 1 and parts[0][0:2] != b"--":
match = rx.search(parts[1])
if match:
key = match.group(1)
value = b"".join(parts[3 + parts[2:].index(b""):])
r.append((key, value))
return r
return []

View File

@ -7,6 +7,7 @@ from six.moves import urllib
from netlib import utils from netlib import utils
import netlib.http.url import netlib.http.url
from netlib.http import multipart
from . import cookies from . import cookies
from .. import encoding from .. import encoding
from ..multidict import MultiDictView from ..multidict import MultiDictView
@ -369,7 +370,7 @@ class Request(Message):
def _get_multipart_form(self): def _get_multipart_form(self):
is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower() is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower()
if is_valid_content_type: if is_valid_content_type:
return utils.multipartdecode(self.headers, self.content) return multipart.decode(self.headers, self.content)
return () return ()
def _set_multipart_form(self, value): def _set_multipart_form(self, value):

View File

@ -190,62 +190,6 @@ def hostport(scheme, host, port):
return "%s:%d" % (host, port) return "%s:%d" % (host, port)
def parse_content_type(c):
"""
A simple parser for content-type values. Returns a (type, subtype,
parameters) tuple, where type and subtype are strings, and parameters
is a dict. If the string could not be parsed, return None.
E.g. the following string:
text/html; charset=UTF-8
Returns:
("text", "html", {"charset": "UTF-8"})
"""
parts = c.split(";", 1)
ts = parts[0].split("/", 1)
if len(ts) != 2:
return None
d = {}
if len(parts) == 2:
for i in parts[1].split(";"):
clause = i.split("=", 1)
if len(clause) == 2:
d[clause[0].strip()] = clause[1].strip()
return ts[0].lower(), ts[1].lower(), d
def multipartdecode(headers, content):
"""
Takes a multipart boundary encoded string and returns list of (key, value) tuples.
"""
v = headers.get("content-type")
if v:
v = parse_content_type(v)
if not v:
return []
try:
boundary = v[2]["boundary"].encode("ascii")
except (KeyError, UnicodeError):
return []
rx = re.compile(br'\bname="([^"]+)"')
r = []
for i in content.split(b"--" + boundary):
parts = i.splitlines()
if len(parts) > 1 and parts[0][0:2] != b"--":
match = rx.search(parts[1])
if match:
key = match.group(1)
value = b"".join(parts[3 + parts[2:].index(b""):])
r.append((key, value))
return r
return []
def safe_subn(pattern, repl, target, *args, **kwargs): def safe_subn(pattern, repl, target, *args, **kwargs):
""" """
There are Unicode conversion problems with re.subn. We try to smooth There are Unicode conversion problems with re.subn. We try to smooth

View File

@ -1,4 +1,5 @@
from netlib.http import Headers from netlib.http import Headers
from netlib.http.headers import parse_content_type
from netlib.tutils import raises from netlib.tutils import raises
@ -72,3 +73,12 @@ class TestHeaders(object):
replacements = headers.replace(r"Host: ", "X-Host ") replacements = headers.replace(r"Host: ", "X-Host ")
assert replacements == 0 assert replacements == 0
assert headers["Host"] == "example.com" assert headers["Host"] == "example.com"
def test_parse_content_type():
p = parse_content_type
assert p("text/html") == ("text", "html", {})
assert p("text") is None
v = p("text/html; charset=UTF-8")
assert v == ('text', 'html', {'charset': 'UTF-8'})

View File

@ -0,0 +1,23 @@
from netlib.http import Headers
from netlib.http import multipart
def test_decode():
boundary = 'somefancyboundary'
headers = Headers(
content_type='multipart/form-data; boundary=' + boundary
)
content = (
"--{0}\n"
"Content-Disposition: form-data; name=\"field1\"\n\n"
"value1\n"
"--{0}\n"
"Content-Disposition: form-data; name=\"field2\"\n\n"
"value2\n"
"--{0}--".format(boundary).encode()
)
form = multipart.decode(headers, content)
assert len(form) == 2
assert form[0] == (b"field1", b"value1")
assert form[1] == (b"field2", b"value2")

View File

@ -1,7 +1,6 @@
# coding=utf-8 # coding=utf-8
from netlib import utils, tutils from netlib import utils, tutils
from netlib.http import Headers
def test_bidi(): def test_bidi():
@ -38,37 +37,6 @@ def test_pretty_size():
assert utils.pretty_size(1024 * 1024) == "1MB" assert utils.pretty_size(1024 * 1024) == "1MB"
def test_multipartdecode():
boundary = 'somefancyboundary'
headers = Headers(
content_type='multipart/form-data; boundary=' + boundary
)
content = (
"--{0}\n"
"Content-Disposition: form-data; name=\"field1\"\n\n"
"value1\n"
"--{0}\n"
"Content-Disposition: form-data; name=\"field2\"\n\n"
"value2\n"
"--{0}--".format(boundary).encode()
)
form = utils.multipartdecode(headers, content)
assert len(form) == 2
assert form[0] == (b"field1", b"value1")
assert form[1] == (b"field2", b"value2")
def test_parse_content_type():
p = utils.parse_content_type
assert p("text/html") == ("text", "html", {})
assert p("text") is None
v = p("text/html; charset=UTF-8")
assert v == ('text', 'html', {'charset': 'UTF-8'})
def test_safe_subn(): def test_safe_subn():
assert utils.safe_subn("foo", u"bar", "\xc2foo") assert utils.safe_subn("foo", u"bar", "\xc2foo")