test response model, push coverage to 100% branch cov

This commit is contained in:
Maximilian Hils 2015-09-27 00:49:41 +02:00
parent 466888b01a
commit 23d13e4c12
11 changed files with 207 additions and 171 deletions

View File

@ -58,6 +58,7 @@ def _read_quoted_string(s, start):
escaping = False escaping = False
ret = [] ret = []
# Skip the first quote # Skip the first quote
i = start # initialize in case the loop doesn't run.
for i in range(start + 1, len(s)): for i in range(start + 1, len(s)):
if escaping: if escaping:
ret.append(s[i]) ret.append(s[i])

View File

@ -18,6 +18,16 @@ else:
_always_bytes = lambda x: utils.always_bytes(x, "utf-8", "surrogateescape") _always_bytes = lambda x: utils.always_bytes(x, "utf-8", "surrogateescape")
class MessageData(object):
def __eq__(self, other):
if isinstance(other, MessageData):
return self.__dict__ == other.__dict__
return False
def __ne__(self, other):
return not self.__eq__(other)
class Message(object): class Message(object):
def __init__(self, data): def __init__(self, data):
self.data = data self.data = data

View File

@ -10,10 +10,10 @@ from netlib.http import cookies
from netlib.odict import ODict from netlib.odict import ODict
from .. import encoding from .. import encoding
from .headers import Headers from .headers import Headers
from .message import Message, _native, _always_bytes from .message import Message, _native, _always_bytes, MessageData
class RequestData(object): class RequestData(MessageData):
def __init__(self, first_line_format, method, scheme, host, port, path, http_version, headers=None, content=None, def __init__(self, first_line_format, method, scheme, host, port, path, http_version, headers=None, content=None,
timestamp_start=None, timestamp_end=None): timestamp_start=None, timestamp_end=None):
if not headers: if not headers:
@ -32,14 +32,6 @@ class RequestData(object):
self.timestamp_start = timestamp_start self.timestamp_start = timestamp_start
self.timestamp_end = timestamp_end self.timestamp_end = timestamp_end
def __eq__(self, other):
if isinstance(other, RequestData):
return self.__dict__ == other.__dict__
return False
def __ne__(self, other):
return not self.__eq__(other)
class Request(Message): class Request(Message):
""" """

View File

@ -4,12 +4,12 @@ import warnings
from . import cookies from . import cookies
from .headers import Headers from .headers import Headers
from .message import Message, _native, _always_bytes from .message import Message, _native, _always_bytes, MessageData
from .. import utils from .. import utils
from ..odict import ODict from ..odict import ODict
class ResponseData(object): class ResponseData(MessageData):
def __init__(self, http_version, status_code, reason=None, headers=None, content=None, def __init__(self, http_version, status_code, reason=None, headers=None, content=None,
timestamp_start=None, timestamp_end=None): timestamp_start=None, timestamp_end=None):
if not headers: if not headers:
@ -24,14 +24,6 @@ class ResponseData(object):
self.timestamp_start = timestamp_start self.timestamp_start = timestamp_start
self.timestamp_end = timestamp_end self.timestamp_end = timestamp_end
def __eq__(self, other):
if isinstance(other, ResponseData):
return self.__dict__ == other.__dict__
return False
def __ne__(self, other):
return not self.__eq__(other)
class Response(Message): class Response(Message):
""" """
@ -48,7 +40,7 @@ class Response(Message):
utils.pretty_size(len(self.content)) utils.pretty_size(len(self.content))
) )
else: else:
details = "content missing" details = "no content"
return "Response({status_code} {reason}, {details})".format( return "Response({status_code} {reason}, {details})".format(
status_code=self.status_code, status_code=self.status_code,
reason=self.reason, reason=self.reason,

View File

@ -78,10 +78,19 @@ def test_assemble_request_headers():
# https://github.com/mitmproxy/mitmproxy/issues/186 # https://github.com/mitmproxy/mitmproxy/issues/186
r = treq(content=b"") r = treq(content=b"")
r.headers["Transfer-Encoding"] = "chunked" r.headers["Transfer-Encoding"] = "chunked"
c = _assemble_request_headers(r) c = _assemble_request_headers(r.data)
assert b"Transfer-Encoding" in c assert b"Transfer-Encoding" in c
assert b"host" in _assemble_request_headers(treq(headers=Headers()))
def test_assemble_request_headers_host_header():
r = treq()
r.headers = Headers()
c = _assemble_request_headers(r.data)
assert b"host" in c
r.host = None
c = _assemble_request_headers(r.data)
assert b"host" not in c
def test_assemble_response_headers(): def test_assemble_response_headers():

View File

@ -117,6 +117,9 @@ def test_connection_close():
headers["connection"] = "close" headers["connection"] = "close"
assert connection_close(b"HTTP/1.1", headers) assert connection_close(b"HTTP/1.1", headers)
headers["connection"] = "foobar"
assert connection_close(b"HTTP/1.0", headers)
assert not connection_close(b"HTTP/1.1", headers)
def test_expected_http_body_size(): def test_expected_http_body_size():
# Expect: 100-continue # Expect: 100-continue

View File

@ -21,6 +21,7 @@ def test_read_quoted_string():
[(r'"f\\o" x', 0), (r"f\o", 6)], [(r'"f\\o" x', 0), (r"f\o", 6)],
[(r'"f\\" x', 0), (r"f" + '\\', 5)], [(r'"f\\" x', 0), (r"f" + '\\', 5)],
[('"fo\\\"" x', 0), ("fo\"", 6)], [('"fo\\\"" x', 0), ("fo\"", 6)],
[('"foo" x', 7), ("", 8)],
] ]
for q, a in tokens: for q, a in tokens:
assert cookies._read_quoted_string(*q) == a assert cookies._read_quoted_string(*q) == a

View File

@ -1,20 +1,17 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division from __future__ import absolute_import, print_function, division
from netlib.http import decoded from netlib.http import decoded, Headers
from netlib.tutils import tresp from netlib.tutils import tresp, raises
def _test_passthrough_attr(message, attr): def _test_passthrough_attr(message, attr):
def t(self=None):
assert getattr(message, attr) == getattr(message.data, attr) assert getattr(message, attr) == getattr(message.data, attr)
setattr(message, attr, "foo") setattr(message, attr, "foo")
assert getattr(message.data, attr) == "foo" assert getattr(message.data, attr) == "foo"
return t
def _test_decoded_attr(message, attr): def _test_decoded_attr(message, attr):
def t(self=None):
assert getattr(message, attr) == getattr(message.data, attr).decode("utf8") assert getattr(message, attr) == getattr(message.data, attr).decode("utf8")
# Set str, get raw bytes # Set str, get raw bytes
setattr(message, attr, "foo") setattr(message, attr, "foo")
@ -37,7 +34,20 @@ def _test_decoded_attr(message, attr):
d = getattr(message, attr) d = getattr(message, attr)
setattr(message, attr, d) setattr(message, attr, d)
assert getattr(message.data, attr) == b"foo\xFF\x00bar" assert getattr(message.data, attr) == b"foo\xFF\x00bar"
return t
class TestMessageData(object):
def test_eq_ne(self):
data = tresp(timestamp_start=42, timestamp_end=42).data
same = tresp(timestamp_start=42, timestamp_end=42).data
assert data == same
assert not data != same
other = tresp(content=b"foo").data
assert not data == other
assert data != other
assert data != 0
class TestMessage(object): class TestMessage(object):
@ -67,12 +77,20 @@ class TestMessage(object):
assert resp.data.content == b"" assert resp.data.content == b""
assert resp.headers["content-length"] == "0" assert resp.headers["content-length"] == "0"
test_content_basic = _test_passthrough_attr(tresp(), "content") def test_content_basic(self):
test_headers = _test_passthrough_attr(tresp(), "headers") _test_passthrough_attr(tresp(), "content")
test_timestamp_start = _test_passthrough_attr(tresp(), "timestamp_start")
test_timestamp_end = _test_passthrough_attr(tresp(), "timestamp_end")
test_http_version = _test_decoded_attr(tresp(), "http_version") def test_headers(self):
_test_passthrough_attr(tresp(), "headers")
def test_timestamp_start(self):
_test_passthrough_attr(tresp(), "timestamp_start")
def test_timestamp_end(self):
_test_passthrough_attr(tresp(), "timestamp_end")
def teste_http_version(self):
_test_decoded_attr(tresp(), "http_version")
class TestDecodedDecorator(object): class TestDecodedDecorator(object):
@ -133,4 +151,3 @@ class TestDecodedDecorator(object):
assert "content-encoding" not in r.headers assert "content-encoding" not in r.headers
assert r.content is None assert r.content is None

View File

@ -1,94 +0,0 @@
from netlib import tutils
from netlib.odict import ODict, ODictCaseless
from netlib.http import Response, Headers, CONTENT_MISSING
class TestResponse(object):
def test_headers(self):
tutils.raises(AssertionError, Response,
b"HTTP/1.1",
200,
headers='foobar',
)
resp = Response(
b"HTTP/1.1",
200,
)
assert isinstance(resp.headers, Headers)
def test_equal(self):
a = tutils.tresp(timestamp_start=42, timestamp_end=43)
b = tutils.tresp(timestamp_start=42, timestamp_end=43)
assert a == b
assert not a == 'foo'
assert not b == 'foo'
assert not 'foo' == a
assert not 'foo' == b
def test_repr(self):
r = tutils.tresp()
assert "unknown content type" in repr(r)
r.headers["content-type"] = "foo"
assert "foo" in repr(r)
assert repr(tutils.tresp(content=CONTENT_MISSING))
def test_get_cookies_none(self):
resp = tutils.tresp()
resp.headers = Headers()
assert not resp.get_cookies()
def test_get_cookies_simple(self):
resp = tutils.tresp()
resp.headers = Headers(set_cookie="cookiename=cookievalue")
result = resp.get_cookies()
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0] == ["cookievalue", ODict()]
def test_get_cookies_with_parameters(self):
resp = tutils.tresp()
resp.headers = Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly")
result = resp.get_cookies()
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0][0] == "cookievalue"
attrs = result["cookiename"][0][1]
assert len(attrs) == 4
assert attrs["domain"] == ["example.com"]
assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"]
assert attrs["path"] == ["/"]
assert attrs["httponly"] == [None]
def test_get_cookies_no_value(self):
resp = tutils.tresp()
resp.headers = Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/")
result = resp.get_cookies()
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0][0] == ""
assert len(result["cookiename"][0][1]) == 2
def test_get_cookies_twocookies(self):
resp = tutils.tresp()
resp.headers = Headers([
[b"Set-Cookie", b"cookiename=cookievalue"],
[b"Set-Cookie", b"othercookie=othervalue"]
])
result = resp.get_cookies()
assert len(result) == 2
assert "cookiename" in result
assert result["cookiename"][0] == ["cookievalue", ODict()]
assert "othercookie" in result
assert result["othercookie"][0] == ["othervalue", ODict()]
def test_set_cookies(self):
resp = tutils.tresp()
v = resp.get_cookies()
v.add("foo", ["bar", ODictCaseless()])
resp.set_cookies(v)
v = resp.get_cookies()
assert len(v) == 1
assert v["foo"] == [["bar", ODictCaseless()]]

View File

@ -17,31 +17,31 @@ class TestRequestData(object):
assert isinstance(treq(headers=None).headers, Headers) assert isinstance(treq(headers=None).headers, Headers)
def test_eq_ne(self):
request_data = treq().data
same = treq().data
assert request_data == same
assert not request_data != same
other = treq(content=b"foo").data
assert not request_data == other
assert request_data != other
assert request_data != 0
class TestRequestCore(object): class TestRequestCore(object):
"""
Tests for builtins and the attributes that are directly proxied from the data structure
"""
def test_repr(self): def test_repr(self):
request = treq() request = treq()
assert repr(request) == "Request(GET address:22/path)" assert repr(request) == "Request(GET address:22/path)"
request.host = None request.host = None
assert repr(request) == "Request(GET /path)" assert repr(request) == "Request(GET /path)"
test_first_line_format = _test_passthrough_attr(treq(), "first_line_format") def test_first_line_format(self):
test_method = _test_decoded_attr(treq(), "method") _test_passthrough_attr(treq(), "first_line_format")
test_scheme = _test_decoded_attr(treq(), "scheme")
test_port = _test_passthrough_attr(treq(), "port") def test_method(self):
test_path = _test_decoded_attr(treq(), "path") _test_decoded_attr(treq(), "method")
def test_scheme(self):
_test_decoded_attr(treq(), "scheme")
def test_port(self):
_test_passthrough_attr(treq(), "port")
def test_path(self):
_test_decoded_attr(treq(), "path")
def test_host(self): def test_host(self):
if six.PY2: if six.PY2:
@ -86,6 +86,9 @@ class TestRequestCore(object):
class TestRequestUtils(object): class TestRequestUtils(object):
"""
Tests for additional convenience methods.
"""
def test_url(self): def test_url(self):
request = treq() request = treq()
assert request.url == "http://address:22/path" assert request.url == "http://address:22/path"
@ -199,6 +202,11 @@ class TestRequestUtils(object):
def test_constrain_encoding(self): def test_constrain_encoding(self):
request = treq() request = treq()
h = request.headers.copy()
request.constrain_encoding() # no-op if there is no accept_encoding header.
assert request.headers == h
request.headers["Accept-Encoding"] = "identity, gzip, foo" request.headers["Accept-Encoding"] = "identity, gzip, foo"
request.constrain_encoding() request.constrain_encoding()
assert "foo" not in request.headers["Accept-Encoding"] assert "foo" not in request.headers["Accept-Encoding"]

View File

@ -1,3 +1,100 @@
from __future__ import absolute_import, print_function, division from __future__ import absolute_import, print_function, division
# TODO from netlib.http import Headers
from netlib.odict import ODict, ODictCaseless
from netlib.tutils import raises, tresp
from .test_message import _test_passthrough_attr, _test_decoded_attr
class TestResponseData(object):
def test_init(self):
with raises(AssertionError):
tresp(headers="foobar")
assert isinstance(tresp(headers=None).headers, Headers)
class TestResponseCore(object):
"""
Tests for builtins and the attributes that are directly proxied from the data structure
"""
def test_repr(self):
response = tresp()
assert repr(response) == "Response(200 OK, unknown content type, 7B)"
response.content = None
assert repr(response) == "Response(200 OK, no content)"
def test_status_code(self):
_test_passthrough_attr(tresp(), "status_code")
def test_reason(self):
_test_decoded_attr(tresp(), "reason")
class TestResponseUtils(object):
"""
Tests for additional convenience methods.
"""
def test_get_cookies_none(self):
resp = tresp()
resp.headers = Headers()
assert not resp.cookies
def test_get_cookies_empty(self):
resp = tresp()
resp.headers = Headers(set_cookie="")
assert not resp.cookies
def test_get_cookies_simple(self):
resp = tresp()
resp.headers = Headers(set_cookie="cookiename=cookievalue")
result = resp.cookies
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0] == ["cookievalue", ODict()]
def test_get_cookies_with_parameters(self):
resp = tresp()
resp.headers = Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly")
result = resp.cookies
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0][0] == "cookievalue"
attrs = result["cookiename"][0][1]
assert len(attrs) == 4
assert attrs["domain"] == ["example.com"]
assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"]
assert attrs["path"] == ["/"]
assert attrs["httponly"] == [None]
def test_get_cookies_no_value(self):
resp = tresp()
resp.headers = Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/")
result = resp.cookies
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0][0] == ""
assert len(result["cookiename"][0][1]) == 2
def test_get_cookies_twocookies(self):
resp = tresp()
resp.headers = Headers([
[b"Set-Cookie", b"cookiename=cookievalue"],
[b"Set-Cookie", b"othercookie=othervalue"]
])
result = resp.cookies
assert len(result) == 2
assert "cookiename" in result
assert result["cookiename"][0] == ["cookievalue", ODict()]
assert "othercookie" in result
assert result["othercookie"][0] == ["othervalue", ODict()]
def test_set_cookies(self):
resp = tresp()
v = resp.cookies
v.add("foo", ["bar", ODictCaseless()])
resp.set_cookies(v)
v = resp.cookies
assert len(v) == 1
assert v["foo"] == [["bar", ODictCaseless()]]