Merge branch 'http-models'

This commit is contained in:
Maximilian Hils 2015-09-28 13:53:59 +02:00
commit 67229fbdf7
28 changed files with 1320 additions and 853 deletions

View File

@ -12,6 +12,8 @@ ENCODINGS = {"identity", "gzip", "deflate"}
def decode(e, content):
if not isinstance(content, bytes):
return None
encoding_map = {
"identity": identity,
"gzip": decode_gzip,
@ -23,6 +25,8 @@ def decode(e, content):
def encode(e, content):
if not isinstance(content, bytes):
return None
encoding_map = {
"identity": identity,
"gzip": encode_gzip,

View File

@ -1,14 +1,14 @@
from __future__ import absolute_import, print_function, division
from .request import Request
from .response import Response
from .headers import Headers
from .models import Request, Response
from .models import ALPN_PROTO_HTTP1, ALPN_PROTO_H2
from .models import HDR_FORM_MULTIPART, HDR_FORM_URLENCODED, CONTENT_MISSING
from .message import decoded, CONTENT_MISSING
from . import http1, http2
__all__ = [
"Request",
"Response",
"Headers",
"Request", "Response",
"ALPN_PROTO_HTTP1", "ALPN_PROTO_H2",
"HDR_FORM_MULTIPART", "HDR_FORM_URLENCODED", "CONTENT_MISSING",
"decoded", "CONTENT_MISSING",
"http1", "http2",
]

View File

@ -58,6 +58,7 @@ def _read_quoted_string(s, start):
escaping = False
ret = []
# Skip the first quote
i = start # initialize in case the loop doesn't run.
for i in range(start + 1, len(s)):
if escaping:
ret.append(s[i])

View File

@ -8,15 +8,15 @@ from __future__ import absolute_import, print_function, division
import copy
try:
from collections.abc import MutableMapping
except ImportError: # Workaround for Python < 3.3
from collections import MutableMapping
except ImportError: # pragma: nocover
from collections import MutableMapping # Workaround for Python < 3.3
import six
from netlib.utils import always_byte_args, always_bytes
if six.PY2:
if six.PY2: # pragma: nocover
_native = lambda x: x
_always_bytes = lambda x: x
_always_byte_args = lambda x: x
@ -27,7 +27,7 @@ else:
_always_byte_args = always_byte_args("utf-8", "surrogateescape")
class Headers(MutableMapping, object):
class Headers(MutableMapping):
"""
Header class which allows both convenient access to individual headers as well as
direct access to the underlying raw data. Provides a full dictionary interface.
@ -36,12 +36,8 @@ class Headers(MutableMapping, object):
.. code-block:: python
# Create header from a list of (header_name, header_value) tuples
>>> h = Headers([
["Host","example.com"],
["Accept","text/html"],
["accept","application/xml"]
])
# Create headers with keyword arguments
>>> h = Headers(host="example.com", content_type="application/xml")
# Headers mostly behave like a normal dict.
>>> h["Host"]
@ -51,6 +47,13 @@ class Headers(MutableMapping, object):
>>> h["host"]
"example.com"
# Headers can also be creatd from a list of raw (header_name, header_value) byte tuples
>>> h = Headers([
[b"Host",b"example.com"],
[b"Accept",b"text/html"],
[b"accept",b"application/xml"]
])
# Multiple headers are folded into a single header as per RFC7230
>>> h["Accept"]
"text/html, application/xml"
@ -60,17 +63,14 @@ class Headers(MutableMapping, object):
>>> h["Accept"]
"application/text"
# str(h) returns a HTTP1 header block.
>>> print(h)
# bytes(h) returns a HTTP1 header block.
>>> print(bytes(h))
Host: example.com
Accept: application/text
# For full control, the raw header fields can be accessed
>>> h.fields
# Headers can also be crated from keyword arguments
>>> h = Headers(host="example.com", content_type="application/xml")
Caveats:
For use with the "Set-Cookie" header, see :py:meth:`get_all`.
"""
@ -79,8 +79,8 @@ class Headers(MutableMapping, object):
def __init__(self, fields=None, **headers):
"""
Args:
fields: (optional) list of ``(name, value)`` header tuples,
e.g. ``[("Host","example.com")]``. All names and values must be bytes.
fields: (optional) list of ``(name, value)`` header byte tuples,
e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
**headers: Additional headers to set. Will overwrite existing values from `fields`.
For convenience, underscores in header names will be transformed to dashes -
this behaviour does not extend to other methods.
@ -106,7 +106,7 @@ class Headers(MutableMapping, object):
else:
return b""
if six.PY2:
if six.PY2: # pragma: nocover
__str__ = __bytes__
@_always_byte_args

View File

@ -7,30 +7,30 @@ from .. import CONTENT_MISSING
def assemble_request(request):
if request.body == CONTENT_MISSING:
if request.content == CONTENT_MISSING:
raise HttpException("Cannot assemble flow with CONTENT_MISSING")
head = assemble_request_head(request)
body = b"".join(assemble_body(request.headers, [request.body]))
body = b"".join(assemble_body(request.data.headers, [request.data.content]))
return head + body
def assemble_request_head(request):
first_line = _assemble_request_line(request)
headers = _assemble_request_headers(request)
first_line = _assemble_request_line(request.data)
headers = _assemble_request_headers(request.data)
return b"%s\r\n%s\r\n" % (first_line, headers)
def assemble_response(response):
if response.body == CONTENT_MISSING:
if response.content == CONTENT_MISSING:
raise HttpException("Cannot assemble flow with CONTENT_MISSING")
head = assemble_response_head(response)
body = b"".join(assemble_body(response.headers, [response.body]))
body = b"".join(assemble_body(response.data.headers, [response.data.content]))
return head + body
def assemble_response_head(response):
first_line = _assemble_response_line(response)
headers = _assemble_response_headers(response)
first_line = _assemble_response_line(response.data)
headers = _assemble_response_headers(response.data)
return b"%s\r\n%s\r\n" % (first_line, headers)
@ -45,51 +45,58 @@ def assemble_body(headers, body_chunks):
yield chunk
def _assemble_request_line(request, form=None):
if form is None:
form = request.form_out
def _assemble_request_line(request_data):
"""
Args:
request_data (netlib.http.request.RequestData)
"""
form = request_data.first_line_format
if form == "relative":
return b"%s %s %s" % (
request.method,
request.path,
request.http_version
request_data.method,
request_data.path,
request_data.http_version
)
elif form == "authority":
return b"%s %s:%d %s" % (
request.method,
request.host,
request.port,
request.http_version
request_data.method,
request_data.host,
request_data.port,
request_data.http_version
)
elif form == "absolute":
return b"%s %s://%s:%d%s %s" % (
request.method,
request.scheme,
request.host,
request.port,
request.path,
request.http_version
request_data.method,
request_data.scheme,
request_data.host,
request_data.port,
request_data.path,
request_data.http_version
)
else: # pragma: nocover
else:
raise RuntimeError("Invalid request form")
def _assemble_request_headers(request):
headers = request.headers.copy()
if "host" not in headers and request.scheme and request.host and request.port:
def _assemble_request_headers(request_data):
"""
Args:
request_data (netlib.http.request.RequestData)
"""
headers = request_data.headers.copy()
if "host" not in headers and request_data.scheme and request_data.host and request_data.port:
headers["host"] = utils.hostport(
request.scheme,
request.host,
request.port
request_data.scheme,
request_data.host,
request_data.port
)
return bytes(headers)
def _assemble_response_line(response):
def _assemble_response_line(response_data):
return b"%s %d %s" % (
response.http_version,
response.status_code,
response.msg,
response_data.http_version,
response_data.status_code,
response_data.reason,
)

View File

@ -11,7 +11,7 @@ from .. import Request, Response, Headers
def read_request(rfile, body_size_limit=None):
request = read_request_head(rfile)
expected_body_size = expected_http_body_size(request)
request._body = b"".join(read_body(rfile, expected_body_size, limit=body_size_limit))
request.data.content = b"".join(read_body(rfile, expected_body_size, limit=body_size_limit))
request.timestamp_end = time.time()
return request
@ -50,7 +50,7 @@ def read_request_head(rfile):
def read_response(rfile, request, body_size_limit=None):
response = read_response_head(rfile)
expected_body_size = expected_http_body_size(request, response)
response._body = b"".join(read_body(rfile, expected_body_size, body_size_limit))
response.data.content = b"".join(read_body(rfile, expected_body_size, body_size_limit))
response.timestamp_end = time.time()
return response
@ -155,7 +155,7 @@ def connection_close(http_version, headers):
# If we don't have a Connection header, HTTP 1.1 connections are assumed to
# be persistent
return http_version != b"HTTP/1.1"
return http_version != "HTTP/1.1" and http_version != b"HTTP/1.1" # FIXME: Remove one case.
def expected_http_body_size(request, response=None):
@ -184,11 +184,11 @@ def expected_http_body_size(request, response=None):
if headers.get("expect", "").lower() == "100-continue":
return 0
else:
if request.method.upper() == b"HEAD":
if request.method.upper() == "HEAD":
return 0
if 100 <= response_code <= 199:
return 0
if response_code == 200 and request.method.upper() == b"CONNECT":
if response_code == 200 and request.method.upper() == "CONNECT":
return 0
if response_code in (204, 304):
return 0

View File

@ -4,7 +4,7 @@ import time
from hpack.hpack import Encoder, Decoder
from ... import utils
from .. import Headers, Response, Request, ALPN_PROTO_H2
from .. import Headers, Response, Request
from . import frame
@ -283,7 +283,7 @@ class HTTP2Protocol(object):
def check_alpn(self):
alp = self.tcp_handler.get_alpn_proto_negotiated()
if alp != ALPN_PROTO_H2:
if alp != b'h2':
raise NotImplementedError(
"HTTP2Protocol can not handle unknown ALP: %s" % alp)
return True

View File

@ -25,9 +25,6 @@ ERROR_CODES = BiDi(
CLIENT_CONNECTION_PREFACE = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
ALPN_PROTO_H2 = b'h2'
class Frame(object):
"""

196
netlib/http/message.py Normal file
View File

@ -0,0 +1,196 @@
from __future__ import absolute_import, print_function, division
import warnings
import six
from .. import encoding, utils
CONTENT_MISSING = 0
if six.PY2: # pragma: nocover
_native = lambda x: x
_always_bytes = lambda x: x
else:
# While the HTTP head _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded.
_native = lambda x: x.decode("utf-8", "surrogateescape")
_always_bytes = lambda x: utils.always_bytes(x, "utf-8", "surrogateescape")
class MessageData(object):
def __eq__(self, other):
if isinstance(other, MessageData):
return self.__dict__ == other.__dict__
return False
def __ne__(self, other):
return not self.__eq__(other)
class Message(object):
def __init__(self, data):
self.data = data
def __eq__(self, other):
if isinstance(other, Message):
return self.data == other.data
return False
def __ne__(self, other):
return not self.__eq__(other)
@property
def headers(self):
"""
Message headers object
Returns:
netlib.http.Headers
"""
return self.data.headers
@headers.setter
def headers(self, h):
self.data.headers = h
@property
def content(self):
"""
The raw (encoded) HTTP message body
See also: :py:attr:`text`
"""
return self.data.content
@content.setter
def content(self, content):
self.data.content = content
if isinstance(content, bytes):
self.headers["content-length"] = str(len(content))
@property
def http_version(self):
"""
Version string, e.g. "HTTP/1.1"
"""
return _native(self.data.http_version)
@http_version.setter
def http_version(self, http_version):
self.data.http_version = _always_bytes(http_version)
@property
def timestamp_start(self):
"""
First byte timestamp
"""
return self.data.timestamp_start
@timestamp_start.setter
def timestamp_start(self, timestamp_start):
self.data.timestamp_start = timestamp_start
@property
def timestamp_end(self):
"""
Last byte timestamp
"""
return self.data.timestamp_end
@timestamp_end.setter
def timestamp_end(self, timestamp_end):
self.data.timestamp_end = timestamp_end
@property
def text(self):
"""
The decoded HTTP message body.
Decoded contents are not cached, so accessing this attribute repeatedly is relatively expensive.
.. note::
This is not implemented yet.
See also: :py:attr:`content`, :py:class:`decoded`
"""
# This attribute should be called text, because that's what requests does.
raise NotImplementedError()
@text.setter
def text(self, text):
raise NotImplementedError()
def decode(self):
"""
Decodes body based on the current Content-Encoding header, then
removes the header. If there is no Content-Encoding header, no
action is taken.
Returns:
True, if decoding succeeded.
False, otherwise.
"""
ce = self.headers.get("content-encoding")
data = encoding.decode(ce, self.content)
if data is None:
return False
self.content = data
self.headers.pop("content-encoding", None)
return True
def encode(self, e):
"""
Encodes body with the encoding e, where e is "gzip", "deflate" or "identity".
Returns:
True, if decoding succeeded.
False, otherwise.
"""
data = encoding.encode(e, self.content)
if data is None:
return False
self.content = data
self.headers["content-encoding"] = e
return True
# Legacy
@property
def body(self): # pragma: nocover
warnings.warn(".body is deprecated, use .content instead.", DeprecationWarning)
return self.content
@body.setter
def body(self, body): # pragma: nocover
warnings.warn(".body is deprecated, use .content instead.", DeprecationWarning)
self.content = body
class decoded(object):
"""
A context manager that decodes a request or response, and then
re-encodes it with the same encoding after execution of the block.
Example:
.. code-block:: python
with decoded(request):
request.content = request.content.replace("foo", "bar")
"""
def __init__(self, message):
self.message = message
ce = message.headers.get("content-encoding")
if ce in encoding.ENCODINGS:
self.ce = ce
else:
self.ce = None
def __enter__(self):
if self.ce:
self.message.decode()
def __exit__(self, type, value, tb):
if self.ce:
self.message.encode(self.ce)

View File

@ -1,345 +0,0 @@
from ..odict import ODict
from .. import utils, encoding
from ..utils import always_bytes, native
from . import cookies
from .headers import Headers
from six.moves import urllib
# TODO: Move somewhere else?
ALPN_PROTO_HTTP1 = b'http/1.1'
ALPN_PROTO_H2 = b'h2'
HDR_FORM_URLENCODED = "application/x-www-form-urlencoded"
HDR_FORM_MULTIPART = "multipart/form-data"
CONTENT_MISSING = 0
class Message(object):
def __init__(self, http_version, headers, body, timestamp_start, timestamp_end):
self.http_version = http_version
if not headers:
headers = Headers()
assert isinstance(headers, Headers)
self.headers = headers
self._body = body
self.timestamp_start = timestamp_start
self.timestamp_end = timestamp_end
@property
def body(self):
return self._body
@body.setter
def body(self, body):
self._body = body
if isinstance(body, bytes):
self.headers["content-length"] = str(len(body)).encode()
content = body
def __eq__(self, other):
if isinstance(other, Message):
return self.__dict__ == other.__dict__
return False
class Request(Message):
def __init__(
self,
form_in,
method,
scheme,
host,
port,
path,
http_version,
headers=None,
body=None,
timestamp_start=None,
timestamp_end=None,
form_out=None
):
super(Request, self).__init__(http_version, headers, body, timestamp_start, timestamp_end)
self.form_in = form_in
self.method = method
self.scheme = scheme
self.host = host
self.port = port
self.path = path
self.form_out = form_out or form_in
def __repr__(self):
if self.host and self.port:
hostport = "{}:{}".format(native(self.host,"idna"), self.port)
else:
hostport = ""
path = self.path or ""
return "HTTPRequest({} {}{})".format(
self.method, hostport, path
)
def anticache(self):
"""
Modifies this request to remove headers that might produce a cached
response. That is, we remove ETags and If-Modified-Since headers.
"""
delheaders = [
"if-modified-since",
"if-none-match",
]
for i in delheaders:
self.headers.pop(i, None)
def anticomp(self):
"""
Modifies this request to remove headers that will compress the
resource's data.
"""
self.headers["accept-encoding"] = "identity"
def constrain_encoding(self):
"""
Limits the permissible Accept-Encoding values, based on what we can
decode appropriately.
"""
accept_encoding = self.headers.get("accept-encoding")
if accept_encoding:
self.headers["accept-encoding"] = (
', '.join(
e
for e in encoding.ENCODINGS
if e in accept_encoding
)
)
def update_host_header(self):
"""
Update the host header to reflect the current target.
"""
self.headers["host"] = self.host
def get_form(self):
"""
Retrieves the URL-encoded or multipart form data, returning an ODict object.
Returns an empty ODict if there is no data or the content-type
indicates non-form data.
"""
if self.body:
if HDR_FORM_URLENCODED in self.headers.get("content-type", "").lower():
return self.get_form_urlencoded()
elif HDR_FORM_MULTIPART in self.headers.get("content-type", "").lower():
return self.get_form_multipart()
return ODict([])
def get_form_urlencoded(self):
"""
Retrieves the URL-encoded form data, returning an ODict object.
Returns an empty ODict if there is no data or the content-type
indicates non-form data.
"""
if self.body and HDR_FORM_URLENCODED in self.headers.get("content-type", "").lower():
return ODict(utils.urldecode(self.body))
return ODict([])
def get_form_multipart(self):
if self.body and HDR_FORM_MULTIPART in self.headers.get("content-type", "").lower():
return ODict(
utils.multipartdecode(
self.headers,
self.body))
return ODict([])
def set_form_urlencoded(self, odict):
"""
Sets the body to the URL-encoded form data, and adds the
appropriate content-type header. Note that this will destory the
existing body if there is one.
"""
# FIXME: If there's an existing content-type header indicating a
# url-encoded form, leave it alone.
self.headers["content-type"] = HDR_FORM_URLENCODED
self.body = utils.urlencode(odict.lst)
def get_path_components(self):
"""
Returns the path components of the URL as a list of strings.
Components are unquoted.
"""
_, _, path, _, _, _ = urllib.parse.urlparse(self.url)
return [urllib.parse.unquote(native(i,"ascii")) for i in path.split(b"/") if i]
def set_path_components(self, lst):
"""
Takes a list of strings, and sets the path component of the URL.
Components are quoted.
"""
lst = [urllib.parse.quote(i, safe="") for i in lst]
path = always_bytes("/" + "/".join(lst))
scheme, netloc, _, params, query, fragment = urllib.parse.urlparse(self.url)
self.url = urllib.parse.urlunparse(
[scheme, netloc, path, params, query, fragment]
)
def get_query(self):
"""
Gets the request query string. Returns an ODict object.
"""
_, _, _, _, query, _ = urllib.parse.urlparse(self.url)
if query:
return ODict(utils.urldecode(query))
return ODict([])
def set_query(self, odict):
"""
Takes an ODict object, and sets the request query string.
"""
scheme, netloc, path, params, _, fragment = urllib.parse.urlparse(self.url)
query = utils.urlencode(odict.lst)
self.url = urllib.parse.urlunparse(
[scheme, netloc, path, params, query, fragment]
)
def pretty_host(self, hostheader):
"""
Heuristic to get the host of the request.
Note that pretty_host() does not always return the TCP destination
of the request, e.g. if an upstream proxy is in place
If hostheader is set to True, the Host: header will be used as
additional (and preferred) data source. This is handy in
transparent mode, where only the IO of the destination is known,
but not the resolved name. This is disabled by default, as an
attacker may spoof the host header to confuse an analyst.
"""
if hostheader and "host" in self.headers:
try:
return self.headers["host"]
except ValueError:
pass
if self.host:
return self.host.decode("idna")
def pretty_url(self, hostheader):
if self.form_out == "authority": # upstream proxy mode
return b"%s:%d" % (always_bytes(self.pretty_host(hostheader)), self.port)
return utils.unparse_url(self.scheme,
self.pretty_host(hostheader),
self.port,
self.path)
def get_cookies(self):
"""
Returns a possibly empty netlib.odict.ODict object.
"""
ret = ODict()
for i in self.headers.get_all("Cookie"):
ret.extend(cookies.parse_cookie_header(i))
return ret
def set_cookies(self, odict):
"""
Takes an netlib.odict.ODict object. Over-writes any existing Cookie
headers.
"""
v = cookies.format_cookie_header(odict)
self.headers["cookie"] = v
@property
def url(self):
"""
Returns a URL string, constructed from the Request's URL components.
"""
return utils.unparse_url(
self.scheme,
self.host,
self.port,
self.path
)
@url.setter
def url(self, url):
"""
Parses a URL specification, and updates the Request's information
accordingly.
Raises:
ValueError if the URL was invalid
"""
# TODO: Should handle incoming unicode here.
parts = utils.parse_url(url)
if not parts:
raise ValueError("Invalid URL: %s" % url)
self.scheme, self.host, self.port, self.path = parts
class Response(Message):
def __init__(
self,
http_version,
status_code,
msg=None,
headers=None,
body=None,
timestamp_start=None,
timestamp_end=None,
):
super(Response, self).__init__(http_version, headers, body, timestamp_start, timestamp_end)
self.status_code = status_code
self.msg = msg
def __repr__(self):
# return "Response(%s - %s)" % (self.status_code, self.msg)
if self.body:
size = utils.pretty_size(len(self.body))
else:
size = "content missing"
# TODO: Remove "(unknown content type, content missing)" edge-case
return "<Response: {status_code} {msg} ({contenttype}, {size})>".format(
status_code=self.status_code,
msg=self.msg,
contenttype=self.headers.get("content-type", "unknown content type"),
size=size)
def get_cookies(self):
"""
Get the contents of all Set-Cookie headers.
Returns a possibly empty ODict, where keys are cookie name strings,
and values are [value, attr] lists. Value is a string, and attr is
an ODictCaseless containing cookie attributes. Within attrs, unary
attributes (e.g. HTTPOnly) are indicated by a Null value.
"""
ret = []
for header in self.headers.get_all("set-cookie"):
v = cookies.parse_set_cookie_header(header)
if v:
name, value, attrs = v
ret.append([name, [value, attrs]])
return ODict(ret)
def set_cookies(self, odict):
"""
Set the Set-Cookie headers on this response, over-writing existing
headers.
Accepts an ODict of the same format as that returned by get_cookies.
"""
values = []
for i in odict.lst:
values.append(
cookies.format_set_cookie_header(
i[0],
i[1][0],
i[1][1]
)
)
self.headers.set_all("set-cookie", values)

352
netlib/http/request.py Normal file
View File

@ -0,0 +1,352 @@
from __future__ import absolute_import, print_function, division
import warnings
import six
from six.moves import urllib
from netlib import utils
from netlib.http import cookies
from netlib.odict import ODict
from .. import encoding
from .headers import Headers
from .message import Message, _native, _always_bytes, MessageData
class RequestData(MessageData):
def __init__(self, first_line_format, method, scheme, host, port, path, http_version, headers=None, content=None,
timestamp_start=None, timestamp_end=None):
if not headers:
headers = Headers()
assert isinstance(headers, Headers)
self.first_line_format = first_line_format
self.method = method
self.scheme = scheme
self.host = host
self.port = port
self.path = path
self.http_version = http_version
self.headers = headers
self.content = content
self.timestamp_start = timestamp_start
self.timestamp_end = timestamp_end
class Request(Message):
"""
An HTTP request.
"""
def __init__(self, *args, **kwargs):
data = RequestData(*args, **kwargs)
super(Request, self).__init__(data)
def __repr__(self):
if self.host and self.port:
hostport = "{}:{}".format(self.host, self.port)
else:
hostport = ""
path = self.path or ""
return "Request({} {}{})".format(
self.method, hostport, path
)
@property
def first_line_format(self):
"""
HTTP request form as defined in `RFC7230 <https://tools.ietf.org/html/rfc7230#section-5.3>`_.
origin-form and asterisk-form are subsumed as "relative".
"""
return self.data.first_line_format
@first_line_format.setter
def first_line_format(self, first_line_format):
self.data.first_line_format = first_line_format
@property
def method(self):
"""
HTTP request method, e.g. "GET".
"""
return _native(self.data.method).upper()
@method.setter
def method(self, method):
self.data.method = _always_bytes(method)
@property
def scheme(self):
"""
HTTP request scheme, which should be "http" or "https".
"""
return _native(self.data.scheme)
@scheme.setter
def scheme(self, scheme):
self.data.scheme = _always_bytes(scheme)
@property
def host(self):
"""
Target host. This may be parsed from the raw request
(e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
or inferred from the proxy mode (e.g. an IP in transparent mode).
"""
if six.PY2: # pragma: nocover
return self.data.host
if not self.data.host:
return self.data.host
try:
return self.data.host.decode("idna")
except UnicodeError:
return self.data.host.decode("utf8", "surrogateescape")
@host.setter
def host(self, host):
if isinstance(host, six.text_type):
try:
# There's no non-strict mode for IDNA encoding.
# We don't want this operation to fail though, so we try
# utf8 as a last resort.
host = host.encode("idna", "strict")
except UnicodeError:
host = host.encode("utf8", "surrogateescape")
self.data.host = host
# Update host header
if "host" in self.headers:
if host:
self.headers["host"] = host
else:
self.headers.pop("host")
@property
def port(self):
"""
Target port
"""
return self.data.port
@port.setter
def port(self, port):
self.data.port = port
@property
def path(self):
"""
HTTP request path, e.g. "/index.html".
Guaranteed to start with a slash.
"""
return _native(self.data.path)
@path.setter
def path(self, path):
self.data.path = _always_bytes(path)
@property
def url(self):
"""
The URL string, constructed from the request's URL components
"""
return utils.unparse_url(self.scheme, self.host, self.port, self.path)
@url.setter
def url(self, url):
self.scheme, self.host, self.port, self.path = utils.parse_url(url)
@property
def pretty_host(self):
"""
Similar to :py:attr:`host`, but using the Host headers as an additional preferred data source.
This is useful in transparent mode where :py:attr:`host` is only an IP address,
but may not reflect the actual destination as the Host header could be spoofed.
"""
return self.headers.get("host", self.host)
@property
def pretty_url(self):
"""
Like :py:attr:`url`, but using :py:attr:`pretty_host` instead of :py:attr:`host`.
"""
if self.first_line_format == "authority":
return "%s:%d" % (self.pretty_host, self.port)
return utils.unparse_url(self.scheme, self.pretty_host, self.port, self.path)
@property
def query(self):
"""
The request query string as an :py:class:`ODict` object.
None, if there is no query.
"""
_, _, _, _, query, _ = urllib.parse.urlparse(self.url)
if query:
return ODict(utils.urldecode(query))
return None
@query.setter
def query(self, odict):
query = utils.urlencode(odict.lst)
scheme, netloc, path, params, _, fragment = urllib.parse.urlparse(self.url)
self.url = urllib.parse.urlunparse([scheme, netloc, path, params, query, fragment])
@property
def cookies(self):
"""
The request cookies.
An empty :py:class:`ODict` object if the cookie monster ate them all.
"""
ret = ODict()
for i in self.headers.get_all("Cookie"):
ret.extend(cookies.parse_cookie_header(i))
return ret
@cookies.setter
def cookies(self, odict):
self.headers["cookie"] = cookies.format_cookie_header(odict)
@property
def path_components(self):
"""
The URL's path components as a list of strings.
Components are unquoted.
"""
_, _, path, _, _, _ = urllib.parse.urlparse(self.url)
return [urllib.parse.unquote(i) for i in path.split("/") if i]
@path_components.setter
def path_components(self, components):
components = map(lambda x: urllib.parse.quote(x, safe=""), components)
path = "/" + "/".join(components)
scheme, netloc, _, params, query, fragment = urllib.parse.urlparse(self.url)
self.url = urllib.parse.urlunparse([scheme, netloc, path, params, query, fragment])
def anticache(self):
"""
Modifies this request to remove headers that might produce a cached
response. That is, we remove ETags and If-Modified-Since headers.
"""
delheaders = [
"if-modified-since",
"if-none-match",
]
for i in delheaders:
self.headers.pop(i, None)
def anticomp(self):
"""
Modifies this request to remove headers that will compress the
resource's data.
"""
self.headers["accept-encoding"] = "identity"
def constrain_encoding(self):
"""
Limits the permissible Accept-Encoding values, based on what we can
decode appropriately.
"""
accept_encoding = self.headers.get("accept-encoding")
if accept_encoding:
self.headers["accept-encoding"] = (
', '.join(
e
for e in encoding.ENCODINGS
if e in accept_encoding
)
)
@property
def urlencoded_form(self):
"""
The URL-encoded form data as an :py:class:`ODict` object.
None if there is no data or the content-type indicates non-form data.
"""
is_valid_content_type = "application/x-www-form-urlencoded" in self.headers.get("content-type", "").lower()
if self.content and is_valid_content_type:
return ODict(utils.urldecode(self.content))
return None
@urlencoded_form.setter
def urlencoded_form(self, odict):
"""
Sets the body to the URL-encoded form data, and adds the appropriate content-type header.
This will overwrite the existing content if there is one.
"""
self.headers["content-type"] = "application/x-www-form-urlencoded"
self.content = utils.urlencode(odict.lst)
@property
def multipart_form(self):
"""
The multipart form data as an :py:class:`ODict` object.
None if there is no data or the content-type indicates non-form data.
"""
is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower()
if self.content and is_valid_content_type:
return ODict(utils.multipartdecode(self.headers,self.content))
return None
@multipart_form.setter
def multipart_form(self):
raise NotImplementedError()
# Legacy
def get_cookies(self): # pragma: nocover
warnings.warn(".get_cookies is deprecated, use .cookies instead.", DeprecationWarning)
return self.cookies
def set_cookies(self, odict): # pragma: nocover
warnings.warn(".set_cookies is deprecated, use .cookies instead.", DeprecationWarning)
self.cookies = odict
def get_query(self): # pragma: nocover
warnings.warn(".get_query is deprecated, use .query instead.", DeprecationWarning)
return self.query or ODict([])
def set_query(self, odict): # pragma: nocover
warnings.warn(".set_query is deprecated, use .query instead.", DeprecationWarning)
self.query = odict
def get_path_components(self): # pragma: nocover
warnings.warn(".get_path_components is deprecated, use .path_components instead.", DeprecationWarning)
return self.path_components
def set_path_components(self, lst): # pragma: nocover
warnings.warn(".set_path_components is deprecated, use .path_components instead.", DeprecationWarning)
self.path_components = lst
def get_form_urlencoded(self): # pragma: nocover
warnings.warn(".get_form_urlencoded is deprecated, use .urlencoded_form instead.", DeprecationWarning)
return self.urlencoded_form or ODict([])
def set_form_urlencoded(self, odict): # pragma: nocover
warnings.warn(".set_form_urlencoded is deprecated, use .urlencoded_form instead.", DeprecationWarning)
self.urlencoded_form = odict
def get_form_multipart(self): # pragma: nocover
warnings.warn(".get_form_multipart is deprecated, use .multipart_form instead.", DeprecationWarning)
return self.multipart_form or ODict([])
@property
def form_in(self): # pragma: nocover
warnings.warn(".form_in is deprecated, use .first_line_format instead.", DeprecationWarning)
return self.first_line_format
@form_in.setter
def form_in(self, form_in): # pragma: nocover
warnings.warn(".form_in is deprecated, use .first_line_format instead.", DeprecationWarning)
self.first_line_format = form_in
@property
def form_out(self): # pragma: nocover
warnings.warn(".form_out is deprecated, use .first_line_format instead.", DeprecationWarning)
return self.first_line_format
@form_out.setter
def form_out(self, form_out): # pragma: nocover
warnings.warn(".form_out is deprecated, use .first_line_format instead.", DeprecationWarning)
self.first_line_format = form_out

117
netlib/http/response.py Normal file
View File

@ -0,0 +1,117 @@
from __future__ import absolute_import, print_function, division
import warnings
from . import cookies
from .headers import Headers
from .message import Message, _native, _always_bytes, MessageData
from .. import utils
from ..odict import ODict
class ResponseData(MessageData):
def __init__(self, http_version, status_code, reason=None, headers=None, content=None,
timestamp_start=None, timestamp_end=None):
if not headers:
headers = Headers()
assert isinstance(headers, Headers)
self.http_version = http_version
self.status_code = status_code
self.reason = reason
self.headers = headers
self.content = content
self.timestamp_start = timestamp_start
self.timestamp_end = timestamp_end
class Response(Message):
"""
An HTTP response.
"""
def __init__(self, *args, **kwargs):
data = ResponseData(*args, **kwargs)
super(Response, self).__init__(data)
def __repr__(self):
if self.content:
details = "{}, {}".format(
self.headers.get("content-type", "unknown content type"),
utils.pretty_size(len(self.content))
)
else:
details = "no content"
return "Response({status_code} {reason}, {details})".format(
status_code=self.status_code,
reason=self.reason,
details=details
)
@property
def status_code(self):
"""
HTTP Status Code, e.g. ``200``.
"""
return self.data.status_code
@status_code.setter
def status_code(self, status_code):
self.data.status_code = status_code
@property
def reason(self):
"""
HTTP Reason Phrase, e.g. "Not Found".
This is always :py:obj:`None` for HTTP2 requests, because HTTP2 responses do not contain a reason phrase.
"""
return _native(self.data.reason)
@reason.setter
def reason(self, reason):
self.data.reason = _always_bytes(reason)
@property
def cookies(self):
"""
Get the contents of all Set-Cookie headers.
A possibly empty :py:class:`ODict`, where keys are cookie name strings,
and values are [value, attr] lists. Value is a string, and attr is
an ODictCaseless containing cookie attributes. Within attrs, unary
attributes (e.g. HTTPOnly) are indicated by a Null value.
"""
ret = []
for header in self.headers.get_all("set-cookie"):
v = cookies.parse_set_cookie_header(header)
if v:
name, value, attrs = v
ret.append([name, [value, attrs]])
return ODict(ret)
@cookies.setter
def cookies(self, odict):
values = []
for i in odict.lst:
header = cookies.format_set_cookie_header(i[0], i[1][0], i[1][1])
values.append(header)
self.headers.set_all("set-cookie", values)
# Legacy
def get_cookies(self): # pragma: nocover
warnings.warn(".get_cookies is deprecated, use .cookies instead.", DeprecationWarning)
return self.cookies
def set_cookies(self, odict): # pragma: nocover
warnings.warn(".set_cookies is deprecated, use .cookies instead.", DeprecationWarning)
self.cookies = odict
@property
def msg(self): # pragma: nocover
warnings.warn(".msg is deprecated, use .reason instead.", DeprecationWarning)
return self.reason
@msg.setter
def msg(self, reason): # pragma: nocover
warnings.warn(".msg is deprecated, use .reason instead.", DeprecationWarning)
self.reason = reason

View File

@ -1,4 +1,4 @@
from __future__ import (absolute_import, print_function, division)
from __future__ import absolute_import, print_function, division
CONTINUE = 100
SWITCHING = 101
@ -37,6 +37,7 @@ REQUEST_URI_TOO_LONG = 414
UNSUPPORTED_MEDIA_TYPE = 415
REQUESTED_RANGE_NOT_SATISFIABLE = 416
EXPECTATION_FAILED = 417
IM_A_TEAPOT = 418
INTERNAL_SERVER_ERROR = 500
NOT_IMPLEMENTED = 501
@ -91,6 +92,7 @@ RESPONSES = {
UNSUPPORTED_MEDIA_TYPE: "Unsupported Media Type",
REQUESTED_RANGE_NOT_SATISFIABLE: "Requested Range not satisfiable",
EXPECTATION_FAILED: "Expectation Failed",
IM_A_TEAPOT: "I'm a teapot",
# 500
INTERNAL_SERVER_ERROR: "Internal Server Error",

View File

@ -98,7 +98,7 @@ def treq(**kwargs):
netlib.http.Request
"""
default = dict(
form_in="relative",
first_line_format="relative",
method=b"GET",
scheme=b"http",
host=b"address",
@ -106,7 +106,7 @@ def treq(**kwargs):
path=b"/path",
http_version=b"HTTP/1.1",
headers=Headers(header="qvalue"),
body=b"content"
content=b"content"
)
default.update(kwargs)
return Request(**default)
@ -120,9 +120,9 @@ def tresp(**kwargs):
default = dict(
http_version=b"HTTP/1.1",
status_code=200,
msg=b"OK",
headers=Headers(header_response=b"svalue"),
body=b"message",
reason=b"OK",
headers=Headers(header_response="svalue"),
content=b"message",
timestamp_start=time.time(),
timestamp_end=time.time(),
)

View File

@ -274,22 +274,27 @@ def get_header_tokens(headers, key):
return [token.strip() for token in tokens]
@always_byte_args()
def hostport(scheme, host, port):
"""
Returns the host component, with a port specifcation if needed.
"""
if (port, scheme) in [(80, b"http"), (443, b"https")]:
if (port, scheme) in [(80, "http"), (443, "https"), (80, b"http"), (443, b"https")]:
return host
else:
return b"%s:%d" % (host, port)
if isinstance(host, six.binary_type):
return b"%s:%d" % (host, port)
else:
return "%s:%d" % (host, port)
def unparse_url(scheme, host, port, path=""):
"""
Returns a URL string, constructed from the specified compnents.
Returns a URL string, constructed from the specified components.
Args:
All args must be str.
"""
return b"%s://%s%s" % (scheme, hostport(scheme, host, port), path)
return "%s://%s%s" % (scheme, hostport(scheme, host, port), path)
def urlencode(s):

View File

@ -25,9 +25,9 @@ class Flow(object):
class Request(object):
def __init__(self, scheme, method, path, http_version, headers, body):
def __init__(self, scheme, method, path, http_version, headers, content):
self.scheme, self.method, self.path = scheme, method, path
self.headers, self.body = headers, body
self.headers, self.content = headers, content
self.http_version = http_version
@ -64,7 +64,7 @@ class WSGIAdaptor(object):
environ = {
'wsgi.version': (1, 0),
'wsgi.url_scheme': native(flow.request.scheme, "latin-1"),
'wsgi.input': BytesIO(flow.request.body or b""),
'wsgi.input': BytesIO(flow.request.content or b""),
'wsgi.errors': errsoc,
'wsgi.multithread': True,
'wsgi.multiprocess': False,

View File

@ -20,7 +20,7 @@ def test_assemble_request():
)
with raises(HttpException):
assemble_request(treq(body=CONTENT_MISSING))
assemble_request(treq(content=CONTENT_MISSING))
def test_assemble_request_head():
@ -40,7 +40,7 @@ def test_assemble_response():
)
with raises(HttpException):
assemble_response(tresp(body=CONTENT_MISSING))
assemble_response(tresp(content=CONTENT_MISSING))
def test_assemble_response_head():
@ -62,31 +62,40 @@ def test_assemble_body():
def test_assemble_request_line():
assert _assemble_request_line(treq()) == b"GET /path HTTP/1.1"
assert _assemble_request_line(treq().data) == b"GET /path HTTP/1.1"
authority_request = treq(method=b"CONNECT", form_in="authority")
authority_request = treq(method=b"CONNECT", first_line_format="authority").data
assert _assemble_request_line(authority_request) == b"CONNECT address:22 HTTP/1.1"
absolute_request = treq(form_in="absolute")
absolute_request = treq(first_line_format="absolute").data
assert _assemble_request_line(absolute_request) == b"GET http://address:22/path HTTP/1.1"
with raises(RuntimeError):
_assemble_request_line(treq(), "invalid_form")
_assemble_request_line(treq(first_line_format="invalid_form").data)
def test_assemble_request_headers():
# https://github.com/mitmproxy/mitmproxy/issues/186
r = treq(body=b"")
r = treq(content=b"")
r.headers["Transfer-Encoding"] = "chunked"
c = _assemble_request_headers(r)
c = _assemble_request_headers(r.data)
assert b"Transfer-Encoding" in c
assert b"host" in _assemble_request_headers(treq(headers=Headers()))
def test_assemble_request_headers_host_header():
r = treq()
r.headers = Headers()
c = _assemble_request_headers(r.data)
assert b"host" in c
r.host = None
c = _assemble_request_headers(r.data)
assert b"host" not in c
def test_assemble_response_headers():
# https://github.com/mitmproxy/mitmproxy/issues/186
r = tresp(body=b"")
r = tresp(content=b"")
r.headers["Transfer-Encoding"] = "chunked"
c = _assemble_response_headers(r)
assert b"Transfer-Encoding" in c

View File

@ -2,7 +2,7 @@ from __future__ import absolute_import, print_function, division
from io import BytesIO
import textwrap
from mock import Mock
from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect
from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect, TcpDisconnect
from netlib.http import Headers
from netlib.http.http1.read import (
read_request, read_response, read_request_head,
@ -16,8 +16,8 @@ from netlib.tutils import treq, tresp, raises
def test_read_request():
rfile = BytesIO(b"GET / HTTP/1.1\r\n\r\nskip")
r = read_request(rfile)
assert r.method == b"GET"
assert r.body == b""
assert r.method == "GET"
assert r.content == b""
assert r.timestamp_end
assert rfile.read() == b"skip"
@ -32,9 +32,9 @@ def test_read_request_head():
rfile.reset_timestamps = Mock()
rfile.first_byte_timestamp = 42
r = read_request_head(rfile)
assert r.method == b"GET"
assert r.method == "GET"
assert r.headers["Content-Length"] == "4"
assert r.body is None
assert r.content is None
assert rfile.reset_timestamps.called
assert r.timestamp_start == 42
assert rfile.read() == b"skip"
@ -45,7 +45,7 @@ def test_read_response():
rfile = BytesIO(b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody")
r = read_response(rfile, req)
assert r.status_code == 418
assert r.body == b"body"
assert r.content == b"body"
assert r.timestamp_end
@ -61,7 +61,7 @@ def test_read_response_head():
r = read_response_head(rfile)
assert r.status_code == 418
assert r.headers["Content-Length"] == "4"
assert r.body is None
assert r.content is None
assert rfile.reset_timestamps.called
assert r.timestamp_start == 42
assert rfile.read() == b"skip"
@ -100,6 +100,11 @@ class TestReadBody(object):
with raises(HttpException):
b"".join(read_body(rfile, -1, 3))
def test_max_chunk_size(self):
rfile = BytesIO(b"123456")
assert list(read_body(rfile, -1, max_chunk_size=None)) == [b"123456"]
rfile = BytesIO(b"123456")
assert list(read_body(rfile, -1, max_chunk_size=1)) == [b"1", b"2", b"3", b"4", b"5", b"6"]
def test_connection_close():
headers = Headers()
@ -112,6 +117,9 @@ def test_connection_close():
headers["connection"] = "close"
assert connection_close(b"HTTP/1.1", headers)
headers["connection"] = "foobar"
assert connection_close(b"HTTP/1.0", headers)
assert not connection_close(b"HTTP/1.1", headers)
def test_expected_http_body_size():
# Expect: 100-continue
@ -169,6 +177,11 @@ def test_get_first_line():
rfile = BytesIO(b"")
_get_first_line(rfile)
with raises(HttpReadDisconnect):
rfile = Mock()
rfile.readline.side_effect = TcpDisconnect
_get_first_line(rfile)
def test_read_request_line():
def t(b):
@ -187,7 +200,8 @@ def test_read_request_line():
t(b"GET / WTF/1.1")
with raises(HttpSyntaxException):
t(b"this is not http")
with raises(HttpReadDisconnect):
t(b"")
def test_parse_authority_form():
assert _parse_authority_form(b"foo:42") == (b"foo", 42)
@ -218,6 +232,8 @@ def test_read_response_line():
t(b"HTTP/1.1 OK OK")
with raises(HttpSyntaxException):
t(b"WTF/1.1 200 OK")
with raises(HttpReadDisconnect):
t(b"")
def test_check_http_version():
@ -283,7 +299,7 @@ class TestReadHeaders(object):
def test_read_chunked():
req = treq(body=None)
req = treq(content=None)
req.headers["Transfer-Encoding"] = "chunked"
data = b"1\r\na\r\n0\r\n"

View File

@ -65,7 +65,7 @@ class TestProtocol:
class TestCheckALPNMatch(tservers.ServerTestBase):
handler = EchoHandler
ssl = dict(
alpn_select=ALPN_PROTO_H2,
alpn_select=b'h2',
)
if OpenSSL._util.lib.Cryptography_HAS_ALPN:
@ -73,7 +73,7 @@ class TestCheckALPNMatch(tservers.ServerTestBase):
def test_check_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
c.convert_to_ssl(alpn_protos=[ALPN_PROTO_H2])
c.convert_to_ssl(alpn_protos=[b'h2'])
protocol = HTTP2Protocol(c)
assert protocol.check_alpn()
@ -89,7 +89,7 @@ class TestCheckALPNMismatch(tservers.ServerTestBase):
def test_check_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
c.convert_to_ssl(alpn_protos=[ALPN_PROTO_H2])
c.convert_to_ssl(alpn_protos=[b'h2'])
protocol = HTTP2Protocol(c)
tutils.raises(NotImplementedError, protocol.check_alpn)
@ -311,7 +311,7 @@ class TestReadRequest(tservers.ServerTestBase):
assert req.stream_id
assert req.headers.fields == [[':method', 'GET'], [':path', '/'], [':scheme', 'https']]
assert req.body == b'foobar'
assert req.content == b'foobar'
class TestReadRequestRelative(tservers.ServerTestBase):
@ -417,7 +417,7 @@ class TestReadResponse(tservers.ServerTestBase):
assert resp.status_code == 200
assert resp.msg == ""
assert resp.headers.fields == [[':status', '200'], ['etag', 'foobar']]
assert resp.body == b'foobar'
assert resp.content == b'foobar'
assert resp.timestamp_end
@ -444,7 +444,7 @@ class TestReadEmptyResponse(tservers.ServerTestBase):
assert resp.status_code == 200
assert resp.msg == ""
assert resp.headers.fields == [[':status', '200'], ['etag', 'foobar']]
assert resp.body == b''
assert resp.content == b''
class TestAssembleRequest(object):

View File

@ -21,6 +21,7 @@ def test_read_quoted_string():
[(r'"f\\o" x', 0), (r"f\o", 6)],
[(r'"f\\" x', 0), (r"f" + '\\', 5)],
[('"fo\\\"" x', 0), ("fo\"", 6)],
[('"foo" x', 7), ("", 8)],
]
for q, a in tokens:
assert cookies._read_quoted_string(*q) == a

View File

@ -38,6 +38,9 @@ class TestHeaders(object):
assert headers["Host"] == "example.com"
assert headers["Accept"] == "text/plain"
with raises(ValueError):
Headers([[b"Host", u"not-bytes"]])
def test_getitem(self):
headers = Headers(Host="example.com")
assert headers["Host"] == "example.com"

153
test/http/test_message.py Normal file
View File

@ -0,0 +1,153 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division
from netlib.http import decoded, Headers
from netlib.tutils import tresp, raises
def _test_passthrough_attr(message, attr):
assert getattr(message, attr) == getattr(message.data, attr)
setattr(message, attr, "foo")
assert getattr(message.data, attr) == "foo"
def _test_decoded_attr(message, attr):
assert getattr(message, attr) == getattr(message.data, attr).decode("utf8")
# Set str, get raw bytes
setattr(message, attr, "foo")
assert getattr(message.data, attr) == b"foo"
# Set raw bytes, get decoded
setattr(message.data, attr, b"bar")
assert getattr(message, attr) == "bar"
# Set bytes, get raw bytes
setattr(message, attr, b"baz")
assert getattr(message.data, attr) == b"baz"
# Set UTF8
setattr(message, attr, "Non-Autorisé")
assert getattr(message.data, attr) == b"Non-Autoris\xc3\xa9"
# Don't fail on garbage
setattr(message.data, attr, b"foo\xFF\x00bar")
assert getattr(message, attr).startswith("foo")
assert getattr(message, attr).endswith("bar")
# foo.bar = foo.bar should not cause any side effects.
d = getattr(message, attr)
setattr(message, attr, d)
assert getattr(message.data, attr) == b"foo\xFF\x00bar"
class TestMessageData(object):
def test_eq_ne(self):
data = tresp(timestamp_start=42, timestamp_end=42).data
same = tresp(timestamp_start=42, timestamp_end=42).data
assert data == same
assert not data != same
other = tresp(content=b"foo").data
assert not data == other
assert data != other
assert data != 0
class TestMessage(object):
def test_init(self):
resp = tresp()
assert resp.data
def test_eq_ne(self):
resp = tresp(timestamp_start=42, timestamp_end=42)
same = tresp(timestamp_start=42, timestamp_end=42)
assert resp == same
assert not resp != same
other = tresp(timestamp_start=0, timestamp_end=0)
assert not resp == other
assert resp != other
assert resp != 0
def test_content_length_update(self):
resp = tresp()
resp.content = b"foo"
assert resp.data.content == b"foo"
assert resp.headers["content-length"] == "3"
resp.content = b""
assert resp.data.content == b""
assert resp.headers["content-length"] == "0"
def test_content_basic(self):
_test_passthrough_attr(tresp(), "content")
def test_headers(self):
_test_passthrough_attr(tresp(), "headers")
def test_timestamp_start(self):
_test_passthrough_attr(tresp(), "timestamp_start")
def test_timestamp_end(self):
_test_passthrough_attr(tresp(), "timestamp_end")
def teste_http_version(self):
_test_decoded_attr(tresp(), "http_version")
class TestDecodedDecorator(object):
def test_simple(self):
r = tresp()
assert r.content == b"message"
assert "content-encoding" not in r.headers
assert r.encode("gzip")
assert r.headers["content-encoding"]
assert r.content != b"message"
with decoded(r):
assert "content-encoding" not in r.headers
assert r.content == b"message"
assert r.headers["content-encoding"]
assert r.content != b"message"
def test_modify(self):
r = tresp()
assert "content-encoding" not in r.headers
assert r.encode("gzip")
with decoded(r):
r.content = b"foo"
assert r.content != b"foo"
r.decode()
assert r.content == b"foo"
def test_unknown_ce(self):
r = tresp()
r.headers["content-encoding"] = "zopfli"
r.content = b"foo"
with decoded(r):
assert r.headers["content-encoding"]
assert r.content == b"foo"
assert r.headers["content-encoding"]
assert r.content == b"foo"
def test_cannot_decode(self):
r = tresp()
assert r.encode("gzip")
r.content = b"foo"
with decoded(r):
assert r.headers["content-encoding"]
assert r.content == b"foo"
assert r.headers["content-encoding"]
assert r.content != b"foo"
r.decode()
assert r.content == b"foo"
def test_cannot_encode(self):
r = tresp()
assert r.encode("gzip")
with decoded(r):
r.content = None
assert "content-encoding" not in r.headers
assert r.content is None

View File

@ -1,395 +0,0 @@
import mock
from netlib import tutils
from netlib import utils
from netlib.odict import ODict, ODictCaseless
from netlib.http import Request, Response, Headers, CONTENT_MISSING, HDR_FORM_URLENCODED, \
HDR_FORM_MULTIPART
class TestRequest(object):
def test_repr(self):
r = tutils.treq()
assert repr(r)
def test_headers(self):
tutils.raises(AssertionError, Request,
'form_in',
'method',
'scheme',
'host',
'port',
'path',
b"HTTP/1.1",
'foobar',
)
req = Request(
'form_in',
'method',
'scheme',
'host',
'port',
'path',
b"HTTP/1.1",
)
assert isinstance(req.headers, Headers)
def test_equal(self):
a = tutils.treq(timestamp_start=42, timestamp_end=43)
b = tutils.treq(timestamp_start=42, timestamp_end=43)
assert a == b
assert not a == 'foo'
assert not b == 'foo'
assert not 'foo' == a
assert not 'foo' == b
def test_anticache(self):
req = tutils.treq()
req.headers["If-Modified-Since"] = "foo"
req.headers["If-None-Match"] = "bar"
req.anticache()
assert "If-Modified-Since" not in req.headers
assert "If-None-Match" not in req.headers
def test_anticomp(self):
req = tutils.treq()
req.headers["Accept-Encoding"] = "foobar"
req.anticomp()
assert req.headers["Accept-Encoding"] == "identity"
def test_constrain_encoding(self):
req = tutils.treq()
req.headers["Accept-Encoding"] = "identity, gzip, foo"
req.constrain_encoding()
assert "foo" not in req.headers["Accept-Encoding"]
def test_update_host(self):
req = tutils.treq()
req.headers["Host"] = ""
req.host = "foobar"
req.update_host_header()
assert req.headers["Host"] == "foobar"
def test_get_form(self):
req = tutils.treq()
assert req.get_form() == ODict()
@mock.patch("netlib.http.Request.get_form_multipart")
@mock.patch("netlib.http.Request.get_form_urlencoded")
def test_get_form_with_url_encoded(self, mock_method_urlencoded, mock_method_multipart):
req = tutils.treq()
assert req.get_form() == ODict()
req = tutils.treq()
req.body = "foobar"
req.headers["Content-Type"] = HDR_FORM_URLENCODED
req.get_form()
assert req.get_form_urlencoded.called
assert not req.get_form_multipart.called
@mock.patch("netlib.http.Request.get_form_multipart")
@mock.patch("netlib.http.Request.get_form_urlencoded")
def test_get_form_with_multipart(self, mock_method_urlencoded, mock_method_multipart):
req = tutils.treq()
req.body = "foobar"
req.headers["Content-Type"] = HDR_FORM_MULTIPART
req.get_form()
assert not req.get_form_urlencoded.called
assert req.get_form_multipart.called
def test_get_form_urlencoded(self):
req = tutils.treq(body="foobar")
assert req.get_form_urlencoded() == ODict()
req.headers["Content-Type"] = HDR_FORM_URLENCODED
assert req.get_form_urlencoded() == ODict(utils.urldecode(req.body))
def test_get_form_multipart(self):
req = tutils.treq(body="foobar")
assert req.get_form_multipart() == ODict()
req.headers["Content-Type"] = HDR_FORM_MULTIPART
assert req.get_form_multipart() == ODict(
utils.multipartdecode(
req.headers,
req.body
)
)
def test_set_form_urlencoded(self):
req = tutils.treq()
req.set_form_urlencoded(ODict([('foo', 'bar'), ('rab', 'oof')]))
assert req.headers["Content-Type"] == HDR_FORM_URLENCODED
assert req.body
def test_get_path_components(self):
req = tutils.treq()
assert req.get_path_components()
# TODO: add meaningful assertions
def test_set_path_components(self):
req = tutils.treq()
req.set_path_components([b"foo", b"bar"])
# TODO: add meaningful assertions
def test_get_query(self):
req = tutils.treq()
assert req.get_query().lst == []
req.url = "http://localhost:80/foo?bar=42"
assert req.get_query().lst == [(b"bar", b"42")]
def test_set_query(self):
req = tutils.treq()
req.set_query(ODict([]))
def test_pretty_host(self):
r = tutils.treq()
assert r.pretty_host(True) == "address"
assert r.pretty_host(False) == "address"
r.headers["host"] = "other"
assert r.pretty_host(True) == "other"
assert r.pretty_host(False) == "address"
r.host = None
assert r.pretty_host(True) == "other"
assert r.pretty_host(False) is None
del r.headers["host"]
assert r.pretty_host(True) is None
assert r.pretty_host(False) is None
# Invalid IDNA
r.headers["host"] = ".disqus.com"
assert r.pretty_host(True) == ".disqus.com"
def test_pretty_url(self):
req = tutils.treq()
req.form_out = "authority"
assert req.pretty_url(True) == b"address:22"
assert req.pretty_url(False) == b"address:22"
req.form_out = "relative"
assert req.pretty_url(True) == b"http://address:22/path"
assert req.pretty_url(False) == b"http://address:22/path"
def test_get_cookies_none(self):
headers = Headers()
r = tutils.treq()
r.headers = headers
assert len(r.get_cookies()) == 0
def test_get_cookies_single(self):
r = tutils.treq()
r.headers = Headers(cookie="cookiename=cookievalue")
result = r.get_cookies()
assert len(result) == 1
assert result['cookiename'] == ['cookievalue']
def test_get_cookies_double(self):
r = tutils.treq()
r.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue")
result = r.get_cookies()
assert len(result) == 2
assert result['cookiename'] == ['cookievalue']
assert result['othercookiename'] == ['othercookievalue']
def test_get_cookies_withequalsign(self):
r = tutils.treq()
r.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue")
result = r.get_cookies()
assert len(result) == 2
assert result['cookiename'] == ['coo=kievalue']
assert result['othercookiename'] == ['othercookievalue']
def test_set_cookies(self):
r = tutils.treq()
r.headers = Headers(cookie="cookiename=cookievalue")
result = r.get_cookies()
result["cookiename"] = ["foo"]
r.set_cookies(result)
assert r.get_cookies()["cookiename"] == ["foo"]
def test_set_url(self):
r = tutils.treq(form_in="absolute")
r.url = b"https://otheraddress:42/ORLY"
assert r.scheme == b"https"
assert r.host == b"otheraddress"
assert r.port == 42
assert r.path == b"/ORLY"
try:
r.url = "//localhost:80/foo@bar"
assert False
except:
assert True
# def test_asterisk_form_in(self):
# f = tutils.tflow(req=None)
# protocol = mock_protocol("OPTIONS * HTTP/1.1")
# f.request = HTTPRequest.from_protocol(protocol)
#
# assert f.request.form_in == "relative"
# f.request.host = f.server_conn.address.host
# f.request.port = f.server_conn.address.port
# f.request.scheme = "http"
# assert protocol.assemble(f.request) == (
# "OPTIONS * HTTP/1.1\r\n"
# "Host: address:22\r\n"
# "Content-Length: 0\r\n\r\n")
#
# def test_relative_form_in(self):
# protocol = mock_protocol("GET /foo\xff HTTP/1.1")
# tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
#
# protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c")
# r = HTTPRequest.from_protocol(protocol)
# assert r.headers["Upgrade"] == ["h2c"]
#
# def test_expect_header(self):
# protocol = mock_protocol(
# "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar")
# r = HTTPRequest.from_protocol(protocol)
# assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
# assert r.content == "foo"
# assert protocol.tcp_handler.rfile.read(3) == "bar"
#
# def test_authority_form_in(self):
# protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1")
# tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
#
# protocol = mock_protocol("CONNECT address:22 HTTP/1.1")
# r = HTTPRequest.from_protocol(protocol)
# r.scheme, r.host, r.port = "http", "address", 22
# assert protocol.assemble(r) == (
# "CONNECT address:22 HTTP/1.1\r\n"
# "Host: address:22\r\n"
# "Content-Length: 0\r\n\r\n")
# assert r.pretty_url(False) == "address:22"
#
# def test_absolute_form_in(self):
# protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1")
# tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
#
# protocol = mock_protocol("GET http://address:22/ HTTP/1.1")
# r = HTTPRequest.from_protocol(protocol)
# assert protocol.assemble(r) == (
# "GET http://address:22/ HTTP/1.1\r\n"
# "Host: address:22\r\n"
# "Content-Length: 0\r\n\r\n")
#
# def test_http_options_relative_form_in(self):
# """
# Exercises fix for Issue #392.
# """
# protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1")
# r = HTTPRequest.from_protocol(protocol)
# r.host = 'address'
# r.port = 80
# r.scheme = "http"
# assert protocol.assemble(r) == (
# "OPTIONS /secret/resource HTTP/1.1\r\n"
# "Host: address\r\n"
# "Content-Length: 0\r\n\r\n")
#
# def test_http_options_absolute_form_in(self):
# protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1")
# r = HTTPRequest.from_protocol(protocol)
# r.host = 'address'
# r.port = 80
# r.scheme = "http"
# assert protocol.assemble(r) == (
# "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n"
# "Host: address\r\n"
# "Content-Length: 0\r\n\r\n")
class TestResponse(object):
def test_headers(self):
tutils.raises(AssertionError, Response,
b"HTTP/1.1",
200,
headers='foobar',
)
resp = Response(
b"HTTP/1.1",
200,
)
assert isinstance(resp.headers, Headers)
def test_equal(self):
a = tutils.tresp(timestamp_start=42, timestamp_end=43)
b = tutils.tresp(timestamp_start=42, timestamp_end=43)
assert a == b
assert not a == 'foo'
assert not b == 'foo'
assert not 'foo' == a
assert not 'foo' == b
def test_repr(self):
r = tutils.tresp()
assert "unknown content type" in repr(r)
r.headers["content-type"] = "foo"
assert "foo" in repr(r)
assert repr(tutils.tresp(body=CONTENT_MISSING))
def test_get_cookies_none(self):
resp = tutils.tresp()
resp.headers = Headers()
assert not resp.get_cookies()
def test_get_cookies_simple(self):
resp = tutils.tresp()
resp.headers = Headers(set_cookie="cookiename=cookievalue")
result = resp.get_cookies()
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0] == ["cookievalue", ODict()]
def test_get_cookies_with_parameters(self):
resp = tutils.tresp()
resp.headers = Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly")
result = resp.get_cookies()
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0][0] == "cookievalue"
attrs = result["cookiename"][0][1]
assert len(attrs) == 4
assert attrs["domain"] == ["example.com"]
assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"]
assert attrs["path"] == ["/"]
assert attrs["httponly"] == [None]
def test_get_cookies_no_value(self):
resp = tutils.tresp()
resp.headers = Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/")
result = resp.get_cookies()
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0][0] == ""
assert len(result["cookiename"][0][1]) == 2
def test_get_cookies_twocookies(self):
resp = tutils.tresp()
resp.headers = Headers([
[b"Set-Cookie", b"cookiename=cookievalue"],
[b"Set-Cookie", b"othercookie=othervalue"]
])
result = resp.get_cookies()
assert len(result) == 2
assert "cookiename" in result
assert result["cookiename"][0] == ["cookievalue", ODict()]
assert "othercookie" in result
assert result["othercookie"][0] == ["othervalue", ODict()]
def test_set_cookies(self):
resp = tutils.tresp()
v = resp.get_cookies()
v.add("foo", ["bar", ODictCaseless()])
resp.set_cookies(v)
v = resp.get_cookies()
assert len(v) == 1
assert v["foo"] == [["bar", ODictCaseless()]]

238
test/http/test_request.py Normal file
View File

@ -0,0 +1,238 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division
import six
from netlib import utils
from netlib.http import Headers
from netlib.odict import ODict
from netlib.tutils import treq, raises
from .test_message import _test_decoded_attr, _test_passthrough_attr
class TestRequestData(object):
def test_init(self):
with raises(AssertionError):
treq(headers="foobar")
assert isinstance(treq(headers=None).headers, Headers)
class TestRequestCore(object):
"""
Tests for builtins and the attributes that are directly proxied from the data structure
"""
def test_repr(self):
request = treq()
assert repr(request) == "Request(GET address:22/path)"
request.host = None
assert repr(request) == "Request(GET /path)"
def test_first_line_format(self):
_test_passthrough_attr(treq(), "first_line_format")
def test_method(self):
_test_decoded_attr(treq(), "method")
def test_scheme(self):
_test_decoded_attr(treq(), "scheme")
def test_port(self):
_test_passthrough_attr(treq(), "port")
def test_path(self):
_test_decoded_attr(treq(), "path")
def test_host(self):
if six.PY2:
from unittest import SkipTest
raise SkipTest()
request = treq()
assert request.host == request.data.host.decode("idna")
# Test IDNA encoding
# Set str, get raw bytes
request.host = "ídna.example"
assert request.data.host == b"xn--dna-qma.example"
# Set raw bytes, get decoded
request.data.host = b"xn--idn-gla.example"
assert request.host == "idná.example"
# Set bytes, get raw bytes
request.host = b"xn--dn-qia9b.example"
assert request.data.host == b"xn--dn-qia9b.example"
# IDNA encoding is not bijective
request.host = "fußball"
assert request.host == "fussball"
# Don't fail on garbage
request.data.host = b"foo\xFF\x00bar"
assert request.host.startswith("foo")
assert request.host.endswith("bar")
# foo.bar = foo.bar should not cause any side effects.
d = request.host
request.host = d
assert request.data.host == b"foo\xFF\x00bar"
def test_host_header_update(self):
request = treq()
assert "host" not in request.headers
request.host = "example.com"
assert "host" not in request.headers
request.headers["Host"] = "foo"
request.host = "example.org"
assert request.headers["Host"] == "example.org"
class TestRequestUtils(object):
"""
Tests for additional convenience methods.
"""
def test_url(self):
request = treq()
assert request.url == "http://address:22/path"
request.url = "https://otheraddress:42/foo"
assert request.scheme == "https"
assert request.host == "otheraddress"
assert request.port == 42
assert request.path == "/foo"
with raises(ValueError):
request.url = "not-a-url"
def test_pretty_host(self):
request = treq()
assert request.pretty_host == "address"
assert request.host == "address"
request.headers["host"] = "other"
assert request.pretty_host == "other"
assert request.host == "address"
request.host = None
assert request.pretty_host is None
assert request.host is None
# Invalid IDNA
request.headers["host"] = ".disqus.com"
assert request.pretty_host == ".disqus.com"
def test_pretty_url(self):
request = treq()
assert request.url == "http://address:22/path"
assert request.pretty_url == "http://address:22/path"
request.headers["host"] = "other"
assert request.pretty_url == "http://other:22/path"
def test_pretty_url_authority(self):
request = treq(first_line_format="authority")
assert request.pretty_url == "address:22"
def test_get_query(self):
request = treq()
assert request.query is None
request.url = "http://localhost:80/foo?bar=42"
assert request.query.lst == [("bar", "42")]
def test_set_query(self):
request = treq()
request.query = ODict([])
def test_get_cookies_none(self):
request = treq()
request.headers = Headers()
assert len(request.cookies) == 0
def test_get_cookies_single(self):
request = treq()
request.headers = Headers(cookie="cookiename=cookievalue")
result = request.cookies
assert len(result) == 1
assert result['cookiename'] == ['cookievalue']
def test_get_cookies_double(self):
request = treq()
request.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue")
result = request.cookies
assert len(result) == 2
assert result['cookiename'] == ['cookievalue']
assert result['othercookiename'] == ['othercookievalue']
def test_get_cookies_withequalsign(self):
request = treq()
request.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue")
result = request.cookies
assert len(result) == 2
assert result['cookiename'] == ['coo=kievalue']
assert result['othercookiename'] == ['othercookievalue']
def test_set_cookies(self):
request = treq()
request.headers = Headers(cookie="cookiename=cookievalue")
result = request.cookies
result["cookiename"] = ["foo"]
request.cookies = result
assert request.cookies["cookiename"] == ["foo"]
def test_get_path_components(self):
request = treq(path=b"/foo/bar")
assert request.path_components == ["foo", "bar"]
def test_set_path_components(self):
request = treq()
request.path_components = ["foo", "baz"]
assert request.path == "/foo/baz"
request.path_components = []
assert request.path == "/"
def test_anticache(self):
request = treq()
request.headers["If-Modified-Since"] = "foo"
request.headers["If-None-Match"] = "bar"
request.anticache()
assert "If-Modified-Since" not in request.headers
assert "If-None-Match" not in request.headers
def test_anticomp(self):
request = treq()
request.headers["Accept-Encoding"] = "foobar"
request.anticomp()
assert request.headers["Accept-Encoding"] == "identity"
def test_constrain_encoding(self):
request = treq()
h = request.headers.copy()
request.constrain_encoding() # no-op if there is no accept_encoding header.
assert request.headers == h
request.headers["Accept-Encoding"] = "identity, gzip, foo"
request.constrain_encoding()
assert "foo" not in request.headers["Accept-Encoding"]
assert "gzip" in request.headers["Accept-Encoding"]
def test_get_urlencoded_form(self):
request = treq(content="foobar")
assert request.urlencoded_form is None
request.headers["Content-Type"] = "application/x-www-form-urlencoded"
assert request.urlencoded_form == ODict(utils.urldecode(request.content))
def test_set_urlencoded_form(self):
request = treq()
request.urlencoded_form = ODict([('foo', 'bar'), ('rab', 'oof')])
assert request.headers["Content-Type"] == "application/x-www-form-urlencoded"
assert request.content
def test_get_multipart_form(self):
request = treq(content="foobar")
assert request.multipart_form is None
request.headers["Content-Type"] = "multipart/form-data"
assert request.multipart_form == ODict(
utils.multipartdecode(
request.headers,
request.content
)
)

100
test/http/test_response.py Normal file
View File

@ -0,0 +1,100 @@
from __future__ import absolute_import, print_function, division
from netlib.http import Headers
from netlib.odict import ODict, ODictCaseless
from netlib.tutils import raises, tresp
from .test_message import _test_passthrough_attr, _test_decoded_attr
class TestResponseData(object):
def test_init(self):
with raises(AssertionError):
tresp(headers="foobar")
assert isinstance(tresp(headers=None).headers, Headers)
class TestResponseCore(object):
"""
Tests for builtins and the attributes that are directly proxied from the data structure
"""
def test_repr(self):
response = tresp()
assert repr(response) == "Response(200 OK, unknown content type, 7B)"
response.content = None
assert repr(response) == "Response(200 OK, no content)"
def test_status_code(self):
_test_passthrough_attr(tresp(), "status_code")
def test_reason(self):
_test_decoded_attr(tresp(), "reason")
class TestResponseUtils(object):
"""
Tests for additional convenience methods.
"""
def test_get_cookies_none(self):
resp = tresp()
resp.headers = Headers()
assert not resp.cookies
def test_get_cookies_empty(self):
resp = tresp()
resp.headers = Headers(set_cookie="")
assert not resp.cookies
def test_get_cookies_simple(self):
resp = tresp()
resp.headers = Headers(set_cookie="cookiename=cookievalue")
result = resp.cookies
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0] == ["cookievalue", ODict()]
def test_get_cookies_with_parameters(self):
resp = tresp()
resp.headers = Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly")
result = resp.cookies
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0][0] == "cookievalue"
attrs = result["cookiename"][0][1]
assert len(attrs) == 4
assert attrs["domain"] == ["example.com"]
assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"]
assert attrs["path"] == ["/"]
assert attrs["httponly"] == [None]
def test_get_cookies_no_value(self):
resp = tresp()
resp.headers = Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/")
result = resp.cookies
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0][0] == ""
assert len(result["cookiename"][0][1]) == 2
def test_get_cookies_twocookies(self):
resp = tresp()
resp.headers = Headers([
[b"Set-Cookie", b"cookiename=cookievalue"],
[b"Set-Cookie", b"othercookie=othervalue"]
])
result = resp.cookies
assert len(result) == 2
assert "cookiename" in result
assert result["cookiename"][0] == ["cookievalue", ODict()]
assert "othercookie" in result
assert result["othercookie"][0] == ["othervalue", ODict()]
def test_set_cookies(self):
resp = tresp()
v = resp.cookies
v.add("foo", ["bar", ODictCaseless()])
resp.set_cookies(v)
v = resp.cookies
assert len(v) == 1
assert v["foo"] == [["bar", ODictCaseless()]]

View File

@ -0,0 +1,6 @@
from netlib.http import status_codes
def test_simple():
assert status_codes.IM_A_TEAPOT == 418
assert status_codes.RESPONSES[418] == "I'm a teapot"

View File

@ -84,10 +84,10 @@ def test_parse_url():
def test_unparse_url():
assert utils.unparse_url(b"http", b"foo.com", 99, b"") == b"http://foo.com:99"
assert utils.unparse_url(b"http", b"foo.com", 80, b"/bar") == b"http://foo.com/bar"
assert utils.unparse_url(b"https", b"foo.com", 80, b"") == b"https://foo.com:80"
assert utils.unparse_url(b"https", b"foo.com", 443, b"") == b"https://foo.com"
assert utils.unparse_url("http", "foo.com", 99, "") == "http://foo.com:99"
assert utils.unparse_url("http", "foo.com", 80, "/bar") == "http://foo.com/bar"
assert utils.unparse_url("https", "foo.com", 80, "") == "https://foo.com:80"
assert utils.unparse_url("https", "foo.com", 443, "") == "https://foo.com"
def test_urlencode():

View File

@ -68,7 +68,7 @@ class WebSocketsClient(tcp.TCPClient):
self.wfile.write(bytes(headers) + b"\r\n")
self.wfile.flush()
resp = read_response(self.rfile, treq(method="GET"))
resp = read_response(self.rfile, treq(method=b"GET"))
server_nonce = self.protocol.check_server_handshake(resp.headers)
if not server_nonce == self.protocol.create_server_nonce(self.client_nonce):