mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
move code from mitmproxy to netlib
This commit is contained in:
parent
199f2a44fe
commit
a837230320
82
netlib/encoding.py
Normal file
82
netlib/encoding.py
Normal file
@ -0,0 +1,82 @@
|
||||
"""
|
||||
Utility functions for decoding response bodies.
|
||||
"""
|
||||
from __future__ import absolute_import
|
||||
import cStringIO
|
||||
import gzip
|
||||
import zlib
|
||||
|
||||
__ALL__ = ["ENCODINGS"]
|
||||
|
||||
ENCODINGS = set(["identity", "gzip", "deflate"])
|
||||
|
||||
|
||||
def decode(e, content):
|
||||
encoding_map = {
|
||||
"identity": identity,
|
||||
"gzip": decode_gzip,
|
||||
"deflate": decode_deflate,
|
||||
}
|
||||
if e not in encoding_map:
|
||||
return None
|
||||
return encoding_map[e](content)
|
||||
|
||||
|
||||
def encode(e, content):
|
||||
encoding_map = {
|
||||
"identity": identity,
|
||||
"gzip": encode_gzip,
|
||||
"deflate": encode_deflate,
|
||||
}
|
||||
if e not in encoding_map:
|
||||
return None
|
||||
return encoding_map[e](content)
|
||||
|
||||
|
||||
def identity(content):
|
||||
"""
|
||||
Returns content unchanged. Identity is the default value of
|
||||
Accept-Encoding headers.
|
||||
"""
|
||||
return content
|
||||
|
||||
|
||||
def decode_gzip(content):
|
||||
gfile = gzip.GzipFile(fileobj=cStringIO.StringIO(content))
|
||||
try:
|
||||
return gfile.read()
|
||||
except (IOError, EOFError):
|
||||
return None
|
||||
|
||||
|
||||
def encode_gzip(content):
|
||||
s = cStringIO.StringIO()
|
||||
gf = gzip.GzipFile(fileobj=s, mode='wb')
|
||||
gf.write(content)
|
||||
gf.close()
|
||||
return s.getvalue()
|
||||
|
||||
|
||||
def decode_deflate(content):
|
||||
"""
|
||||
Returns decompressed data for DEFLATE. Some servers may respond with
|
||||
compressed data without a zlib header or checksum. An undocumented
|
||||
feature of zlib permits the lenient decompression of data missing both
|
||||
values.
|
||||
|
||||
http://bugs.python.org/issue5784
|
||||
"""
|
||||
try:
|
||||
try:
|
||||
return zlib.decompress(content)
|
||||
except zlib.error:
|
||||
return zlib.decompress(content, -15)
|
||||
except zlib.error:
|
||||
return None
|
||||
|
||||
|
||||
def encode_deflate(content):
|
||||
"""
|
||||
Returns compressed content, always including zlib header and checksum.
|
||||
"""
|
||||
return zlib.compress(content)
|
@ -7,3 +7,16 @@ class HttpError(Exception):
|
||||
|
||||
class HttpErrorConnClosed(HttpError):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
class HttpAuthenticationError(Exception):
|
||||
def __init__(self, auth_headers=None):
|
||||
super(HttpAuthenticationError, self).__init__(
|
||||
"Proxy Authentication Required"
|
||||
)
|
||||
self.headers = auth_headers
|
||||
self.code = 407
|
||||
|
||||
def __repr__(self):
|
||||
return "Proxy Authentication Required"
|
||||
|
@ -375,7 +375,7 @@ class HTTP1Protocol(semantics.ProtocolMixin):
|
||||
@classmethod
|
||||
def has_chunked_encoding(self, headers):
|
||||
return "chunked" in [
|
||||
i.lower() for i in http.get_header_tokens(headers, "transfer-encoding")
|
||||
i.lower() for i in utils.get_header_tokens(headers, "transfer-encoding")
|
||||
]
|
||||
|
||||
|
||||
@ -482,9 +482,9 @@ class HTTP1Protocol(semantics.ProtocolMixin):
|
||||
port = int(port)
|
||||
except ValueError:
|
||||
return None
|
||||
if not http.is_valid_port(port):
|
||||
if not utils.is_valid_port(port):
|
||||
return None
|
||||
if not http.is_valid_host(host):
|
||||
if not utils.is_valid_host(host):
|
||||
return None
|
||||
return host, port, httpversion
|
||||
|
||||
@ -496,7 +496,7 @@ class HTTP1Protocol(semantics.ProtocolMixin):
|
||||
return None
|
||||
method, url, httpversion = v
|
||||
|
||||
parts = http.parse_url(url)
|
||||
parts = utils.parse_url(url)
|
||||
if not parts:
|
||||
return None
|
||||
scheme, host, port, path = parts
|
||||
@ -528,7 +528,7 @@ class HTTP1Protocol(semantics.ProtocolMixin):
|
||||
"""
|
||||
# At first, check if we have an explicit Connection header.
|
||||
if "connection" in headers:
|
||||
toks = http.get_header_tokens(headers, "connection")
|
||||
toks = utils.get_header_tokens(headers, "connection")
|
||||
if "close" in toks:
|
||||
return True
|
||||
elif "keep-alive" in toks:
|
||||
@ -556,34 +556,7 @@ class HTTP1Protocol(semantics.ProtocolMixin):
|
||||
|
||||
@classmethod
|
||||
def _assemble_request_first_line(self, request):
|
||||
if request.form_in == "relative":
|
||||
request_line = '%s %s HTTP/%s.%s' % (
|
||||
request.method,
|
||||
request.path,
|
||||
request.httpversion[0],
|
||||
request.httpversion[1],
|
||||
)
|
||||
elif request.form_in == "authority":
|
||||
request_line = '%s %s:%s HTTP/%s.%s' % (
|
||||
request.method,
|
||||
request.host,
|
||||
request.port,
|
||||
request.httpversion[0],
|
||||
request.httpversion[1],
|
||||
)
|
||||
elif request.form_in == "absolute":
|
||||
request_line = '%s %s://%s:%s%s HTTP/%s.%s' % (
|
||||
request.method,
|
||||
request.scheme,
|
||||
request.host,
|
||||
request.port,
|
||||
request.path,
|
||||
request.httpversion[0],
|
||||
request.httpversion[1],
|
||||
)
|
||||
else:
|
||||
raise http.HttpError(400, "Invalid request form")
|
||||
return request_line
|
||||
return request.legacy_first_line()
|
||||
|
||||
def _assemble_request_headers(self, request):
|
||||
headers = request.headers.copy()
|
||||
|
@ -3,9 +3,15 @@ import binascii
|
||||
import collections
|
||||
import string
|
||||
import sys
|
||||
import urllib
|
||||
import urlparse
|
||||
|
||||
from .. import utils, odict
|
||||
from . import cookies
|
||||
from netlib import utils, encoding
|
||||
|
||||
HDR_FORM_URLENCODED = "application/x-www-form-urlencoded"
|
||||
HDR_FORM_MULTIPART = "multipart/form-data"
|
||||
|
||||
CONTENT_MISSING = 0
|
||||
|
||||
@ -75,7 +81,240 @@ class Request(object):
|
||||
return False
|
||||
|
||||
def __repr__(self):
|
||||
return "Request(%s - %s, %s)" % (self.method, self.host, self.path)
|
||||
# return "Request(%s - %s, %s)" % (self.method, self.host, self.path)
|
||||
|
||||
return "<HTTPRequest: {0}>".format(
|
||||
self.legacy_first_line()[:-9]
|
||||
)
|
||||
|
||||
def legacy_first_line(self):
|
||||
if self.form_in == "relative":
|
||||
return '%s %s HTTP/%s.%s' % (
|
||||
self.method,
|
||||
self.path,
|
||||
self.httpversion[0],
|
||||
self.httpversion[1],
|
||||
)
|
||||
elif self.form_in == "authority":
|
||||
return '%s %s:%s HTTP/%s.%s' % (
|
||||
self.method,
|
||||
self.host,
|
||||
self.port,
|
||||
self.httpversion[0],
|
||||
self.httpversion[1],
|
||||
)
|
||||
elif self.form_in == "absolute":
|
||||
return '%s %s://%s:%s%s HTTP/%s.%s' % (
|
||||
self.method,
|
||||
self.scheme,
|
||||
self.host,
|
||||
self.port,
|
||||
self.path,
|
||||
self.httpversion[0],
|
||||
self.httpversion[1],
|
||||
)
|
||||
else:
|
||||
raise http.HttpError(400, "Invalid request form")
|
||||
|
||||
def anticache(self):
|
||||
"""
|
||||
Modifies this request to remove headers that might produce a cached
|
||||
response. That is, we remove ETags and If-Modified-Since headers.
|
||||
"""
|
||||
delheaders = [
|
||||
"if-modified-since",
|
||||
"if-none-match",
|
||||
]
|
||||
for i in delheaders:
|
||||
del self.headers[i]
|
||||
|
||||
def anticomp(self):
|
||||
"""
|
||||
Modifies this request to remove headers that will compress the
|
||||
resource's data.
|
||||
"""
|
||||
self.headers["accept-encoding"] = ["identity"]
|
||||
|
||||
def constrain_encoding(self):
|
||||
"""
|
||||
Limits the permissible Accept-Encoding values, based on what we can
|
||||
decode appropriately.
|
||||
"""
|
||||
if self.headers["accept-encoding"]:
|
||||
self.headers["accept-encoding"] = [
|
||||
', '.join(
|
||||
e for e in encoding.ENCODINGS if e in self.headers["accept-encoding"][0])]
|
||||
|
||||
def update_host_header(self):
|
||||
"""
|
||||
Update the host header to reflect the current target.
|
||||
"""
|
||||
self.headers["Host"] = [self.host]
|
||||
|
||||
def get_form(self):
|
||||
"""
|
||||
Retrieves the URL-encoded or multipart form data, returning an ODict object.
|
||||
Returns an empty ODict if there is no data or the content-type
|
||||
indicates non-form data.
|
||||
"""
|
||||
if self.body:
|
||||
if self.headers.in_any("content-type", HDR_FORM_URLENCODED, True):
|
||||
return self.get_form_urlencoded()
|
||||
elif self.headers.in_any("content-type", HDR_FORM_MULTIPART, True):
|
||||
return self.get_form_multipart()
|
||||
return odict.ODict([])
|
||||
|
||||
def get_form_urlencoded(self):
|
||||
"""
|
||||
Retrieves the URL-encoded form data, returning an ODict object.
|
||||
Returns an empty ODict if there is no data or the content-type
|
||||
indicates non-form data.
|
||||
"""
|
||||
if self.body and self.headers.in_any(
|
||||
"content-type",
|
||||
HDR_FORM_URLENCODED,
|
||||
True):
|
||||
return odict.ODict(utils.urldecode(self.body))
|
||||
return odict.ODict([])
|
||||
|
||||
def get_form_multipart(self):
|
||||
if self.body and self.headers.in_any(
|
||||
"content-type",
|
||||
HDR_FORM_MULTIPART,
|
||||
True):
|
||||
return odict.ODict(
|
||||
utils.multipartdecode(
|
||||
self.headers,
|
||||
self.body))
|
||||
return odict.ODict([])
|
||||
|
||||
def set_form_urlencoded(self, odict):
|
||||
"""
|
||||
Sets the body to the URL-encoded form data, and adds the
|
||||
appropriate content-type header. Note that this will destory the
|
||||
existing body if there is one.
|
||||
"""
|
||||
# FIXME: If there's an existing content-type header indicating a
|
||||
# url-encoded form, leave it alone.
|
||||
self.headers["Content-Type"] = [HDR_FORM_URLENCODED]
|
||||
self.body = utils.urlencode(odict.lst)
|
||||
|
||||
def get_path_components(self):
|
||||
"""
|
||||
Returns the path components of the URL as a list of strings.
|
||||
|
||||
Components are unquoted.
|
||||
"""
|
||||
_, _, path, _, _, _ = urlparse.urlparse(self.url)
|
||||
return [urllib.unquote(i) for i in path.split("/") if i]
|
||||
|
||||
def set_path_components(self, lst):
|
||||
"""
|
||||
Takes a list of strings, and sets the path component of the URL.
|
||||
|
||||
Components are quoted.
|
||||
"""
|
||||
lst = [urllib.quote(i, safe="") for i in lst]
|
||||
path = "/" + "/".join(lst)
|
||||
scheme, netloc, _, params, query, fragment = urlparse.urlparse(self.url)
|
||||
self.url = urlparse.urlunparse(
|
||||
[scheme, netloc, path, params, query, fragment]
|
||||
)
|
||||
|
||||
def get_query(self):
|
||||
"""
|
||||
Gets the request query string. Returns an ODict object.
|
||||
"""
|
||||
_, _, _, _, query, _ = urlparse.urlparse(self.url)
|
||||
if query:
|
||||
return odict.ODict(utils.urldecode(query))
|
||||
return odict.ODict([])
|
||||
|
||||
def set_query(self, odict):
|
||||
"""
|
||||
Takes an ODict object, and sets the request query string.
|
||||
"""
|
||||
scheme, netloc, path, params, _, fragment = urlparse.urlparse(self.url)
|
||||
query = utils.urlencode(odict.lst)
|
||||
self.url = urlparse.urlunparse(
|
||||
[scheme, netloc, path, params, query, fragment]
|
||||
)
|
||||
|
||||
def pretty_host(self, hostheader):
|
||||
"""
|
||||
Heuristic to get the host of the request.
|
||||
|
||||
Note that pretty_host() does not always return the TCP destination
|
||||
of the request, e.g. if an upstream proxy is in place
|
||||
|
||||
If hostheader is set to True, the Host: header will be used as
|
||||
additional (and preferred) data source. This is handy in
|
||||
transparent mode, where only the IO of the destination is known,
|
||||
but not the resolved name. This is disabled by default, as an
|
||||
attacker may spoof the host header to confuse an analyst.
|
||||
"""
|
||||
host = None
|
||||
if hostheader:
|
||||
host = self.headers.get_first("host")
|
||||
if not host:
|
||||
host = self.host
|
||||
if host:
|
||||
try:
|
||||
return host.encode("idna")
|
||||
except ValueError:
|
||||
return host
|
||||
else:
|
||||
return None
|
||||
|
||||
def pretty_url(self, hostheader):
|
||||
if self.form_out == "authority": # upstream proxy mode
|
||||
return "%s:%s" % (self.pretty_host(hostheader), self.port)
|
||||
return utils.unparse_url(self.scheme,
|
||||
self.pretty_host(hostheader),
|
||||
self.port,
|
||||
self.path).encode('ascii')
|
||||
|
||||
def get_cookies(self):
|
||||
"""
|
||||
Returns a possibly empty netlib.odict.ODict object.
|
||||
"""
|
||||
ret = odict.ODict()
|
||||
for i in self.headers["cookie"]:
|
||||
ret.extend(cookies.parse_cookie_header(i))
|
||||
return ret
|
||||
|
||||
def set_cookies(self, odict):
|
||||
"""
|
||||
Takes an netlib.odict.ODict object. Over-writes any existing Cookie
|
||||
headers.
|
||||
"""
|
||||
v = cookies.format_cookie_header(odict)
|
||||
self.headers["Cookie"] = [v]
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
"""
|
||||
Returns a URL string, constructed from the Request's URL components.
|
||||
"""
|
||||
return utils.unparse_url(
|
||||
self.scheme,
|
||||
self.host,
|
||||
self.port,
|
||||
self.path
|
||||
).encode('ascii')
|
||||
|
||||
@url.setter
|
||||
def url(self, url):
|
||||
"""
|
||||
Parses a URL specification, and updates the Request's information
|
||||
accordingly.
|
||||
|
||||
Returns False if the URL was invalid, True if the request succeeded.
|
||||
"""
|
||||
parts = utils.parse_url(url)
|
||||
if not parts:
|
||||
raise ValueError("Invalid URL: %s" % url)
|
||||
self.scheme, self.host, self.port, self.path = parts
|
||||
|
||||
@property
|
||||
def content(self):
|
||||
@ -139,7 +378,56 @@ class Response(object):
|
||||
return False
|
||||
|
||||
def __repr__(self):
|
||||
return "Response(%s - %s)" % (self.status_code, self.msg)
|
||||
# return "Response(%s - %s)" % (self.status_code, self.msg)
|
||||
|
||||
if self.body:
|
||||
size = utils.pretty_size(len(self.body))
|
||||
else:
|
||||
size = "content missing"
|
||||
return "<HTTPResponse: {status_code} {msg} ({contenttype}, {size})>".format(
|
||||
status_code=self.status_code,
|
||||
msg=self.msg,
|
||||
contenttype=self.headers.get_first(
|
||||
"content-type", "unknown content type"
|
||||
),
|
||||
size=size
|
||||
)
|
||||
|
||||
|
||||
def get_cookies(self):
|
||||
"""
|
||||
Get the contents of all Set-Cookie headers.
|
||||
|
||||
Returns a possibly empty ODict, where keys are cookie name strings,
|
||||
and values are [value, attr] lists. Value is a string, and attr is
|
||||
an ODictCaseless containing cookie attributes. Within attrs, unary
|
||||
attributes (e.g. HTTPOnly) are indicated by a Null value.
|
||||
"""
|
||||
ret = []
|
||||
for header in self.headers["set-cookie"]:
|
||||
v = cookies.parse_set_cookie_header(header)
|
||||
if v:
|
||||
name, value, attrs = v
|
||||
ret.append([name, [value, attrs]])
|
||||
return odict.ODict(ret)
|
||||
|
||||
def set_cookies(self, odict):
|
||||
"""
|
||||
Set the Set-Cookie headers on this response, over-writing existing
|
||||
headers.
|
||||
|
||||
Accepts an ODict of the same format as that returned by get_cookies.
|
||||
"""
|
||||
values = []
|
||||
for i in odict.lst:
|
||||
values.append(
|
||||
cookies.format_set_cookie_header(
|
||||
i[0],
|
||||
i[1][0],
|
||||
i[1][1]
|
||||
)
|
||||
)
|
||||
self.headers["Set-Cookie"] = values
|
||||
|
||||
@property
|
||||
def content(self):
|
||||
@ -160,77 +448,3 @@ class Response(object):
|
||||
def code(self, code):
|
||||
# TODO: remove deprecated setter
|
||||
self.status_code = code
|
||||
|
||||
|
||||
|
||||
def is_valid_port(port):
|
||||
if not 0 <= port <= 65535:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def is_valid_host(host):
|
||||
try:
|
||||
host.decode("idna")
|
||||
except ValueError:
|
||||
return False
|
||||
if "\0" in host:
|
||||
return None
|
||||
return True
|
||||
|
||||
|
||||
def parse_url(url):
|
||||
"""
|
||||
Returns a (scheme, host, port, path) tuple, or None on error.
|
||||
|
||||
Checks that:
|
||||
port is an integer 0-65535
|
||||
host is a valid IDNA-encoded hostname with no null-bytes
|
||||
path is valid ASCII
|
||||
"""
|
||||
try:
|
||||
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
|
||||
except ValueError:
|
||||
return None
|
||||
if not scheme:
|
||||
return None
|
||||
if '@' in netloc:
|
||||
# FIXME: Consider what to do with the discarded credentials here Most
|
||||
# probably we should extend the signature to return these as a separate
|
||||
# value.
|
||||
_, netloc = string.rsplit(netloc, '@', maxsplit=1)
|
||||
if ':' in netloc:
|
||||
host, port = string.rsplit(netloc, ':', maxsplit=1)
|
||||
try:
|
||||
port = int(port)
|
||||
except ValueError:
|
||||
return None
|
||||
else:
|
||||
host = netloc
|
||||
if scheme == "https":
|
||||
port = 443
|
||||
else:
|
||||
port = 80
|
||||
path = urlparse.urlunparse(('', '', path, params, query, fragment))
|
||||
if not path.startswith("/"):
|
||||
path = "/" + path
|
||||
if not is_valid_host(host):
|
||||
return None
|
||||
if not utils.isascii(path):
|
||||
return None
|
||||
if not is_valid_port(port):
|
||||
return None
|
||||
return scheme, host, port, path
|
||||
|
||||
|
||||
def get_header_tokens(headers, key):
|
||||
"""
|
||||
Retrieve all tokens for a header key. A number of different headers
|
||||
follow a pattern where each header line can containe comma-separated
|
||||
tokens, and headers can be set multiple times.
|
||||
"""
|
||||
toks = []
|
||||
for i in headers[key]:
|
||||
for j in i.split(","):
|
||||
toks.append(j.strip())
|
||||
return toks
|
||||
|
@ -1,10 +1,11 @@
|
||||
import cStringIO
|
||||
import tempfile
|
||||
import os
|
||||
import time
|
||||
import shutil
|
||||
from contextlib import contextmanager
|
||||
|
||||
from netlib import tcp, utils
|
||||
from netlib import tcp, utils, odict, http
|
||||
|
||||
|
||||
def treader(bytes):
|
||||
@ -66,3 +67,59 @@ def raises(exc, obj, *args, **kwargs):
|
||||
raise AssertionError("No exception raised. Return value: {}".format(ret))
|
||||
|
||||
test_data = utils.Data(__name__)
|
||||
|
||||
|
||||
|
||||
|
||||
def treq(content="content", scheme="http", host="address", port=22):
|
||||
"""
|
||||
@return: libmproxy.protocol.http.HTTPRequest
|
||||
"""
|
||||
headers = odict.ODictCaseless()
|
||||
headers["header"] = ["qvalue"]
|
||||
req = http.Request(
|
||||
"relative",
|
||||
"GET",
|
||||
scheme,
|
||||
host,
|
||||
port,
|
||||
"/path",
|
||||
(1, 1),
|
||||
headers,
|
||||
content,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
return req
|
||||
|
||||
|
||||
def treq_absolute(content="content"):
|
||||
"""
|
||||
@return: libmproxy.protocol.http.HTTPRequest
|
||||
"""
|
||||
r = treq(content)
|
||||
r.form_in = r.form_out = "absolute"
|
||||
r.host = "address"
|
||||
r.port = 22
|
||||
r.scheme = "http"
|
||||
return r
|
||||
|
||||
|
||||
def tresp(content="message"):
|
||||
"""
|
||||
@return: libmproxy.protocol.http.HTTPResponse
|
||||
"""
|
||||
|
||||
headers = odict.ODictCaseless()
|
||||
headers["header_response"] = ["svalue"]
|
||||
|
||||
resp = http.semantics.Response(
|
||||
(1, 1),
|
||||
200,
|
||||
"OK",
|
||||
headers,
|
||||
content,
|
||||
time.time(),
|
||||
time.time(),
|
||||
)
|
||||
return resp
|
100
netlib/utils.py
100
netlib/utils.py
@ -1,5 +1,10 @@
|
||||
from __future__ import (absolute_import, print_function, division)
|
||||
import os.path
|
||||
import cgi
|
||||
import urllib
|
||||
import urlparse
|
||||
import string
|
||||
|
||||
|
||||
def isascii(s):
|
||||
try:
|
||||
@ -131,6 +136,81 @@ class Data(object):
|
||||
return fullpath
|
||||
|
||||
|
||||
|
||||
|
||||
def is_valid_port(port):
|
||||
if not 0 <= port <= 65535:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def is_valid_host(host):
|
||||
try:
|
||||
host.decode("idna")
|
||||
except ValueError:
|
||||
return False
|
||||
if "\0" in host:
|
||||
return None
|
||||
return True
|
||||
|
||||
|
||||
def parse_url(url):
|
||||
"""
|
||||
Returns a (scheme, host, port, path) tuple, or None on error.
|
||||
|
||||
Checks that:
|
||||
port is an integer 0-65535
|
||||
host is a valid IDNA-encoded hostname with no null-bytes
|
||||
path is valid ASCII
|
||||
"""
|
||||
try:
|
||||
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
|
||||
except ValueError:
|
||||
return None
|
||||
if not scheme:
|
||||
return None
|
||||
if '@' in netloc:
|
||||
# FIXME: Consider what to do with the discarded credentials here Most
|
||||
# probably we should extend the signature to return these as a separate
|
||||
# value.
|
||||
_, netloc = string.rsplit(netloc, '@', maxsplit=1)
|
||||
if ':' in netloc:
|
||||
host, port = string.rsplit(netloc, ':', maxsplit=1)
|
||||
try:
|
||||
port = int(port)
|
||||
except ValueError:
|
||||
return None
|
||||
else:
|
||||
host = netloc
|
||||
if scheme == "https":
|
||||
port = 443
|
||||
else:
|
||||
port = 80
|
||||
path = urlparse.urlunparse(('', '', path, params, query, fragment))
|
||||
if not path.startswith("/"):
|
||||
path = "/" + path
|
||||
if not is_valid_host(host):
|
||||
return None
|
||||
if not isascii(path):
|
||||
return None
|
||||
if not is_valid_port(port):
|
||||
return None
|
||||
return scheme, host, port, path
|
||||
|
||||
|
||||
def get_header_tokens(headers, key):
|
||||
"""
|
||||
Retrieve all tokens for a header key. A number of different headers
|
||||
follow a pattern where each header line can containe comma-separated
|
||||
tokens, and headers can be set multiple times.
|
||||
"""
|
||||
toks = []
|
||||
for i in headers[key]:
|
||||
for j in i.split(","):
|
||||
toks.append(j.strip())
|
||||
return toks
|
||||
|
||||
|
||||
def hostport(scheme, host, port):
|
||||
"""
|
||||
Returns the host component, with a port specifcation if needed.
|
||||
@ -139,3 +219,23 @@ def hostport(scheme, host, port):
|
||||
return host
|
||||
else:
|
||||
return "%s:%s" % (host, port)
|
||||
|
||||
def unparse_url(scheme, host, port, path=""):
|
||||
"""
|
||||
Returns a URL string, constructed from the specified compnents.
|
||||
"""
|
||||
return "%s://%s%s" % (scheme, hostport(scheme, host, port), path)
|
||||
|
||||
|
||||
def urlencode(s):
|
||||
"""
|
||||
Takes a list of (key, value) tuples and returns a urlencoded string.
|
||||
"""
|
||||
s = [tuple(i) for i in s]
|
||||
return urllib.urlencode(s, False)
|
||||
|
||||
def urldecode(s):
|
||||
"""
|
||||
Takes a urlencoded string and returns a list of (key, value) tuples.
|
||||
"""
|
||||
return cgi.parse_qsl(s, keep_blank_values=True)
|
||||
|
@ -75,16 +75,6 @@ def test_connection_close():
|
||||
assert HTTP1Protocol.connection_close((1, 1), h)
|
||||
|
||||
|
||||
def test_get_header_tokens():
|
||||
h = odict.ODictCaseless()
|
||||
assert http.get_header_tokens(h, "foo") == []
|
||||
h["foo"] = ["bar"]
|
||||
assert http.get_header_tokens(h, "foo") == ["bar"]
|
||||
h["foo"] = ["bar, voing"]
|
||||
assert http.get_header_tokens(h, "foo") == ["bar", "voing"]
|
||||
h["foo"] = ["bar, voing", "oink"]
|
||||
assert http.get_header_tokens(h, "foo") == ["bar", "voing", "oink"]
|
||||
|
||||
|
||||
def test_read_http_body_request():
|
||||
h = odict.ODictCaseless()
|
||||
|
6
test/http/test_exceptions.py
Normal file
6
test/http/test_exceptions.py
Normal file
@ -0,0 +1,6 @@
|
||||
from netlib.http.exceptions import *
|
||||
|
||||
def test_HttpAuthenticationError():
|
||||
x = HttpAuthenticationError({"foo": "bar"})
|
||||
assert str(x)
|
||||
assert "foo" in x.headers
|
@ -1,54 +1,267 @@
|
||||
import cStringIO
|
||||
import textwrap
|
||||
import binascii
|
||||
from mock import MagicMock
|
||||
|
||||
from netlib import http, odict, tcp
|
||||
from netlib.http import http1
|
||||
from netlib.http.semantics import CONTENT_MISSING
|
||||
from .. import tutils, tservers
|
||||
|
||||
def test_httperror():
|
||||
e = http.exceptions.HttpError(404, "Not found")
|
||||
assert str(e)
|
||||
|
||||
class TestRequest:
|
||||
# def test_asterisk_form_in(self):
|
||||
# f = tutils.tflow(req=None)
|
||||
# protocol = mock_protocol("OPTIONS * HTTP/1.1")
|
||||
# f.request = HTTPRequest.from_protocol(protocol)
|
||||
#
|
||||
# assert f.request.form_in == "relative"
|
||||
# f.request.host = f.server_conn.address.host
|
||||
# f.request.port = f.server_conn.address.port
|
||||
# f.request.scheme = "http"
|
||||
# assert protocol.assemble(f.request) == (
|
||||
# "OPTIONS * HTTP/1.1\r\n"
|
||||
# "Host: address:22\r\n"
|
||||
# "Content-Length: 0\r\n\r\n")
|
||||
#
|
||||
# def test_relative_form_in(self):
|
||||
# protocol = mock_protocol("GET /foo\xff HTTP/1.1")
|
||||
# tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
|
||||
#
|
||||
# protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c")
|
||||
# r = HTTPRequest.from_protocol(protocol)
|
||||
# assert r.headers["Upgrade"] == ["h2c"]
|
||||
#
|
||||
# def test_expect_header(self):
|
||||
# protocol = mock_protocol(
|
||||
# "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar")
|
||||
# r = HTTPRequest.from_protocol(protocol)
|
||||
# assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
|
||||
# assert r.content == "foo"
|
||||
# assert protocol.tcp_handler.rfile.read(3) == "bar"
|
||||
#
|
||||
# def test_authority_form_in(self):
|
||||
# protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1")
|
||||
# tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
|
||||
#
|
||||
# protocol = mock_protocol("CONNECT address:22 HTTP/1.1")
|
||||
# r = HTTPRequest.from_protocol(protocol)
|
||||
# r.scheme, r.host, r.port = "http", "address", 22
|
||||
# assert protocol.assemble(r) == (
|
||||
# "CONNECT address:22 HTTP/1.1\r\n"
|
||||
# "Host: address:22\r\n"
|
||||
# "Content-Length: 0\r\n\r\n")
|
||||
# assert r.pretty_url(False) == "address:22"
|
||||
#
|
||||
# def test_absolute_form_in(self):
|
||||
# protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1")
|
||||
# tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
|
||||
#
|
||||
# protocol = mock_protocol("GET http://address:22/ HTTP/1.1")
|
||||
# r = HTTPRequest.from_protocol(protocol)
|
||||
# assert protocol.assemble(r) == (
|
||||
# "GET http://address:22/ HTTP/1.1\r\n"
|
||||
# "Host: address:22\r\n"
|
||||
# "Content-Length: 0\r\n\r\n")
|
||||
#
|
||||
# def test_http_options_relative_form_in(self):
|
||||
# """
|
||||
# Exercises fix for Issue #392.
|
||||
# """
|
||||
# protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1")
|
||||
# r = HTTPRequest.from_protocol(protocol)
|
||||
# r.host = 'address'
|
||||
# r.port = 80
|
||||
# r.scheme = "http"
|
||||
# assert protocol.assemble(r) == (
|
||||
# "OPTIONS /secret/resource HTTP/1.1\r\n"
|
||||
# "Host: address\r\n"
|
||||
# "Content-Length: 0\r\n\r\n")
|
||||
#
|
||||
# def test_http_options_absolute_form_in(self):
|
||||
# protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1")
|
||||
# r = HTTPRequest.from_protocol(protocol)
|
||||
# r.host = 'address'
|
||||
# r.port = 80
|
||||
# r.scheme = "http"
|
||||
# assert protocol.assemble(r) == (
|
||||
# "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n"
|
||||
# "Host: address\r\n"
|
||||
# "Content-Length: 0\r\n\r\n")
|
||||
|
||||
def test_parse_url():
|
||||
assert not http.parse_url("")
|
||||
def test_set_url(self):
|
||||
r = tutils.treq_absolute()
|
||||
r.url = "https://otheraddress:42/ORLY"
|
||||
assert r.scheme == "https"
|
||||
assert r.host == "otheraddress"
|
||||
assert r.port == 42
|
||||
assert r.path == "/ORLY"
|
||||
|
||||
u = "http://foo.com:8888/test"
|
||||
s, h, po, pa = http.parse_url(u)
|
||||
assert s == "http"
|
||||
assert h == "foo.com"
|
||||
assert po == 8888
|
||||
assert pa == "/test"
|
||||
def test_repr(self):
|
||||
r = tutils.treq()
|
||||
assert repr(r)
|
||||
|
||||
s, h, po, pa = http.parse_url("http://foo/bar")
|
||||
assert s == "http"
|
||||
assert h == "foo"
|
||||
assert po == 80
|
||||
assert pa == "/bar"
|
||||
def test_pretty_host(self):
|
||||
r = tutils.treq()
|
||||
assert r.pretty_host(True) == "address"
|
||||
assert r.pretty_host(False) == "address"
|
||||
r.headers["host"] = ["other"]
|
||||
assert r.pretty_host(True) == "other"
|
||||
assert r.pretty_host(False) == "address"
|
||||
r.host = None
|
||||
assert r.pretty_host(True) == "other"
|
||||
assert r.pretty_host(False) is None
|
||||
del r.headers["host"]
|
||||
assert r.pretty_host(True) is None
|
||||
assert r.pretty_host(False) is None
|
||||
|
||||
s, h, po, pa = http.parse_url("http://user:pass@foo/bar")
|
||||
assert s == "http"
|
||||
assert h == "foo"
|
||||
assert po == 80
|
||||
assert pa == "/bar"
|
||||
# Invalid IDNA
|
||||
r.headers["host"] = [".disqus.com"]
|
||||
assert r.pretty_host(True) == ".disqus.com"
|
||||
|
||||
s, h, po, pa = http.parse_url("http://foo")
|
||||
assert pa == "/"
|
||||
def test_get_form_for_urlencoded(self):
|
||||
r = tutils.treq()
|
||||
r.headers.add("content-type", "application/x-www-form-urlencoded")
|
||||
r.get_form_urlencoded = MagicMock()
|
||||
|
||||
s, h, po, pa = http.parse_url("https://foo")
|
||||
assert po == 443
|
||||
r.get_form()
|
||||
|
||||
assert not http.parse_url("https://foo:bar")
|
||||
assert not http.parse_url("https://foo:")
|
||||
assert r.get_form_urlencoded.called
|
||||
|
||||
# Invalid IDNA
|
||||
assert not http.parse_url("http://\xfafoo")
|
||||
# Invalid PATH
|
||||
assert not http.parse_url("http:/\xc6/localhost:56121")
|
||||
# Null byte in host
|
||||
assert not http.parse_url("http://foo\0")
|
||||
# Port out of range
|
||||
assert not http.parse_url("http://foo:999999")
|
||||
# Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
|
||||
assert not http.parse_url('http://lo[calhost')
|
||||
def test_get_form_for_multipart(self):
|
||||
r = tutils.treq()
|
||||
r.headers.add("content-type", "multipart/form-data")
|
||||
r.get_form_multipart = MagicMock()
|
||||
|
||||
r.get_form()
|
||||
|
||||
assert r.get_form_multipart.called
|
||||
|
||||
def test_get_cookies_none(self):
|
||||
h = odict.ODictCaseless()
|
||||
r = tutils.treq()
|
||||
r.headers = h
|
||||
assert len(r.get_cookies()) == 0
|
||||
|
||||
def test_get_cookies_single(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Cookie"] = ["cookiename=cookievalue"]
|
||||
r = tutils.treq()
|
||||
r.headers = h
|
||||
result = r.get_cookies()
|
||||
assert len(result) == 1
|
||||
assert result['cookiename'] == ['cookievalue']
|
||||
|
||||
def test_get_cookies_double(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Cookie"] = [
|
||||
"cookiename=cookievalue;othercookiename=othercookievalue"
|
||||
]
|
||||
r = tutils.treq()
|
||||
r.headers = h
|
||||
result = r.get_cookies()
|
||||
assert len(result) == 2
|
||||
assert result['cookiename'] == ['cookievalue']
|
||||
assert result['othercookiename'] == ['othercookievalue']
|
||||
|
||||
def test_get_cookies_withequalsign(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Cookie"] = [
|
||||
"cookiename=coo=kievalue;othercookiename=othercookievalue"
|
||||
]
|
||||
r = tutils.treq()
|
||||
r.headers = h
|
||||
result = r.get_cookies()
|
||||
assert len(result) == 2
|
||||
assert result['cookiename'] == ['coo=kievalue']
|
||||
assert result['othercookiename'] == ['othercookievalue']
|
||||
|
||||
def test_set_cookies(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Cookie"] = ["cookiename=cookievalue"]
|
||||
r = tutils.treq()
|
||||
r.headers = h
|
||||
result = r.get_cookies()
|
||||
result["cookiename"] = ["foo"]
|
||||
r.set_cookies(result)
|
||||
assert r.get_cookies()["cookiename"] == ["foo"]
|
||||
|
||||
|
||||
class TestResponse(object):
|
||||
def test_repr(self):
|
||||
r = tutils.tresp()
|
||||
assert "unknown content type" in repr(r)
|
||||
r.headers["content-type"] = ["foo"]
|
||||
assert "foo" in repr(r)
|
||||
assert repr(tutils.tresp(content=CONTENT_MISSING))
|
||||
|
||||
def test_get_cookies_none(self):
|
||||
h = odict.ODictCaseless()
|
||||
resp = tutils.tresp()
|
||||
resp.headers = h
|
||||
assert not resp.get_cookies()
|
||||
|
||||
def test_get_cookies_simple(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Set-Cookie"] = ["cookiename=cookievalue"]
|
||||
resp = tutils.tresp()
|
||||
resp.headers = h
|
||||
result = resp.get_cookies()
|
||||
assert len(result) == 1
|
||||
assert "cookiename" in result
|
||||
assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
|
||||
|
||||
def test_get_cookies_with_parameters(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Set-Cookie"] = [
|
||||
"cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"]
|
||||
resp = tutils.tresp()
|
||||
resp.headers = h
|
||||
result = resp.get_cookies()
|
||||
assert len(result) == 1
|
||||
assert "cookiename" in result
|
||||
assert result["cookiename"][0][0] == "cookievalue"
|
||||
attrs = result["cookiename"][0][1]
|
||||
assert len(attrs) == 4
|
||||
assert attrs["domain"] == ["example.com"]
|
||||
assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"]
|
||||
assert attrs["path"] == ["/"]
|
||||
assert attrs["httponly"] == [None]
|
||||
|
||||
def test_get_cookies_no_value(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Set-Cookie"] = [
|
||||
"cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/"
|
||||
]
|
||||
resp = tutils.tresp()
|
||||
resp.headers = h
|
||||
result = resp.get_cookies()
|
||||
assert len(result) == 1
|
||||
assert "cookiename" in result
|
||||
assert result["cookiename"][0][0] == ""
|
||||
assert len(result["cookiename"][0][1]) == 2
|
||||
|
||||
def test_get_cookies_twocookies(self):
|
||||
h = odict.ODictCaseless()
|
||||
h["Set-Cookie"] = ["cookiename=cookievalue", "othercookie=othervalue"]
|
||||
resp = tutils.tresp()
|
||||
resp.headers = h
|
||||
result = resp.get_cookies()
|
||||
assert len(result) == 2
|
||||
assert "cookiename" in result
|
||||
assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
|
||||
assert "othercookie" in result
|
||||
assert result["othercookie"][0] == ["othervalue", odict.ODict()]
|
||||
|
||||
def test_set_cookies(self):
|
||||
resp = tutils.tresp()
|
||||
v = resp.get_cookies()
|
||||
v.add("foo", ["bar", odict.ODictCaseless()])
|
||||
resp.set_cookies(v)
|
||||
|
||||
v = resp.get_cookies()
|
||||
assert len(v) == 1
|
||||
assert v["foo"] == [["bar", odict.ODictCaseless()]]
|
||||
|
@ -1,4 +1,6 @@
|
||||
from netlib import utils
|
||||
import urlparse
|
||||
|
||||
from netlib import utils, odict
|
||||
import tutils
|
||||
|
||||
|
||||
@ -27,3 +29,76 @@ def test_pretty_size():
|
||||
assert utils.pretty_size(1024) == "1kB"
|
||||
assert utils.pretty_size(1024 + (1024 / 2.0)) == "1.5kB"
|
||||
assert utils.pretty_size(1024 * 1024) == "1MB"
|
||||
|
||||
|
||||
|
||||
|
||||
def test_parse_url():
|
||||
assert not utils.parse_url("")
|
||||
|
||||
u = "http://foo.com:8888/test"
|
||||
s, h, po, pa = utils.parse_url(u)
|
||||
assert s == "http"
|
||||
assert h == "foo.com"
|
||||
assert po == 8888
|
||||
assert pa == "/test"
|
||||
|
||||
s, h, po, pa = utils.parse_url("http://foo/bar")
|
||||
assert s == "http"
|
||||
assert h == "foo"
|
||||
assert po == 80
|
||||
assert pa == "/bar"
|
||||
|
||||
s, h, po, pa = utils.parse_url("http://user:pass@foo/bar")
|
||||
assert s == "http"
|
||||
assert h == "foo"
|
||||
assert po == 80
|
||||
assert pa == "/bar"
|
||||
|
||||
s, h, po, pa = utils.parse_url("http://foo")
|
||||
assert pa == "/"
|
||||
|
||||
s, h, po, pa = utils.parse_url("https://foo")
|
||||
assert po == 443
|
||||
|
||||
assert not utils.parse_url("https://foo:bar")
|
||||
assert not utils.parse_url("https://foo:")
|
||||
|
||||
# Invalid IDNA
|
||||
assert not utils.parse_url("http://\xfafoo")
|
||||
# Invalid PATH
|
||||
assert not utils.parse_url("http:/\xc6/localhost:56121")
|
||||
# Null byte in host
|
||||
assert not utils.parse_url("http://foo\0")
|
||||
# Port out of range
|
||||
assert not utils.parse_url("http://foo:999999")
|
||||
# Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
|
||||
assert not utils.parse_url('http://lo[calhost')
|
||||
|
||||
|
||||
def test_unparse_url():
|
||||
assert utils.unparse_url("http", "foo.com", 99, "") == "http://foo.com:99"
|
||||
assert utils.unparse_url("http", "foo.com", 80, "") == "http://foo.com"
|
||||
assert utils.unparse_url("https", "foo.com", 80, "") == "https://foo.com:80"
|
||||
assert utils.unparse_url("https", "foo.com", 443, "") == "https://foo.com"
|
||||
|
||||
|
||||
def test_urlencode():
|
||||
assert utils.urlencode([('foo', 'bar')])
|
||||
|
||||
|
||||
|
||||
def test_urldecode():
|
||||
s = "one=two&three=four"
|
||||
assert len(utils.urldecode(s)) == 2
|
||||
|
||||
|
||||
def test_get_header_tokens():
|
||||
h = odict.ODictCaseless()
|
||||
assert utils.get_header_tokens(h, "foo") == []
|
||||
h["foo"] = ["bar"]
|
||||
assert utils.get_header_tokens(h, "foo") == ["bar"]
|
||||
h["foo"] = ["bar, voing"]
|
||||
assert utils.get_header_tokens(h, "foo") == ["bar", "voing"]
|
||||
h["foo"] = ["bar, voing", "oink"]
|
||||
assert utils.get_header_tokens(h, "foo") == ["bar", "voing", "oink"]
|
||||
|
Loading…
Reference in New Issue
Block a user