mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-30 11:19:23 +00:00
574 lines
19 KiB
Python
574 lines
19 KiB
Python
import mock
|
|
|
|
from netlib import http
|
|
from netlib import odict
|
|
from netlib import tutils
|
|
from netlib import utils
|
|
from netlib.http import semantics
|
|
from netlib.http.semantics import CONTENT_MISSING
|
|
|
|
class TestProtocolMixin(object):
|
|
@mock.patch("netlib.http.semantics.ProtocolMixin.assemble_response")
|
|
@mock.patch("netlib.http.semantics.ProtocolMixin.assemble_request")
|
|
def test_assemble_request(self, mock_request_method, mock_response_method):
|
|
p = semantics.ProtocolMixin()
|
|
p.assemble(tutils.treq())
|
|
assert mock_request_method.called
|
|
assert not mock_response_method.called
|
|
|
|
@mock.patch("netlib.http.semantics.ProtocolMixin.assemble_response")
|
|
@mock.patch("netlib.http.semantics.ProtocolMixin.assemble_request")
|
|
def test_assemble_response(self, mock_request_method, mock_response_method):
|
|
p = semantics.ProtocolMixin()
|
|
p.assemble(tutils.tresp())
|
|
assert not mock_request_method.called
|
|
assert mock_response_method.called
|
|
|
|
def test_assemble_foo(self):
|
|
p = semantics.ProtocolMixin()
|
|
tutils.raises(ValueError, p.assemble, 'foo')
|
|
|
|
class TestRequest(object):
|
|
def test_repr(self):
|
|
r = tutils.treq()
|
|
assert repr(r)
|
|
|
|
def test_headers(self):
|
|
tutils.raises(AssertionError, semantics.Request,
|
|
'form_in',
|
|
'method',
|
|
'scheme',
|
|
'host',
|
|
'port',
|
|
'path',
|
|
(1, 1),
|
|
'foobar',
|
|
)
|
|
|
|
req = semantics.Request(
|
|
'form_in',
|
|
'method',
|
|
'scheme',
|
|
'host',
|
|
'port',
|
|
'path',
|
|
(1, 1),
|
|
)
|
|
assert isinstance(req.headers, http.Headers)
|
|
|
|
def test_equal(self):
|
|
a = tutils.treq()
|
|
b = tutils.treq()
|
|
assert a == b
|
|
|
|
assert not a == 'foo'
|
|
assert not b == 'foo'
|
|
assert not 'foo' == a
|
|
assert not 'foo' == b
|
|
|
|
def test_legacy_first_line(self):
|
|
req = tutils.treq()
|
|
|
|
assert req.legacy_first_line('relative') == "GET /path HTTP/1.1"
|
|
assert req.legacy_first_line('authority') == "GET address:22 HTTP/1.1"
|
|
assert req.legacy_first_line('absolute') == "GET http://address:22/path HTTP/1.1"
|
|
tutils.raises(http.HttpError, req.legacy_first_line, 'foobar')
|
|
|
|
def test_anticache(self):
|
|
req = tutils.treq()
|
|
req.headers["If-Modified-Since"] = "foo"
|
|
req.headers["If-None-Match"] = "bar"
|
|
req.anticache()
|
|
assert "If-Modified-Since" not in req.headers
|
|
assert "If-None-Match" not in req.headers
|
|
|
|
def test_anticomp(self):
|
|
req = tutils.treq()
|
|
req.headers["Accept-Encoding"] = "foobar"
|
|
req.anticomp()
|
|
assert req.headers["Accept-Encoding"] == "identity"
|
|
|
|
def test_constrain_encoding(self):
|
|
req = tutils.treq()
|
|
req.headers["Accept-Encoding"] = "identity, gzip, foo"
|
|
req.constrain_encoding()
|
|
assert "foo" not in req.headers["Accept-Encoding"]
|
|
|
|
def test_update_host(self):
|
|
req = tutils.treq()
|
|
req.headers["Host"] = ""
|
|
req.host = "foobar"
|
|
req.update_host_header()
|
|
assert req.headers["Host"] == "foobar"
|
|
|
|
def test_get_form(self):
|
|
req = tutils.treq()
|
|
assert req.get_form() == odict.ODict()
|
|
|
|
@mock.patch("netlib.http.semantics.Request.get_form_multipart")
|
|
@mock.patch("netlib.http.semantics.Request.get_form_urlencoded")
|
|
def test_get_form_with_url_encoded(self, mock_method_urlencoded, mock_method_multipart):
|
|
req = tutils.treq()
|
|
assert req.get_form() == odict.ODict()
|
|
|
|
req = tutils.treq()
|
|
req.body = "foobar"
|
|
req.headers["Content-Type"] = semantics.HDR_FORM_URLENCODED
|
|
req.get_form()
|
|
assert req.get_form_urlencoded.called
|
|
assert not req.get_form_multipart.called
|
|
|
|
@mock.patch("netlib.http.semantics.Request.get_form_multipart")
|
|
@mock.patch("netlib.http.semantics.Request.get_form_urlencoded")
|
|
def test_get_form_with_multipart(self, mock_method_urlencoded, mock_method_multipart):
|
|
req = tutils.treq()
|
|
req.body = "foobar"
|
|
req.headers["Content-Type"] = semantics.HDR_FORM_MULTIPART
|
|
req.get_form()
|
|
assert not req.get_form_urlencoded.called
|
|
assert req.get_form_multipart.called
|
|
|
|
def test_get_form_urlencoded(self):
|
|
req = tutils.treq("foobar")
|
|
assert req.get_form_urlencoded() == odict.ODict()
|
|
|
|
req.headers["Content-Type"] = semantics.HDR_FORM_URLENCODED
|
|
assert req.get_form_urlencoded() == odict.ODict(utils.urldecode(req.body))
|
|
|
|
def test_get_form_multipart(self):
|
|
req = tutils.treq("foobar")
|
|
assert req.get_form_multipart() == odict.ODict()
|
|
|
|
req.headers["Content-Type"] = semantics.HDR_FORM_MULTIPART
|
|
assert req.get_form_multipart() == odict.ODict(
|
|
utils.multipartdecode(
|
|
req.headers,
|
|
req.body
|
|
)
|
|
)
|
|
|
|
def test_set_form_urlencoded(self):
|
|
req = tutils.treq()
|
|
req.set_form_urlencoded(odict.ODict([('foo', 'bar'), ('rab', 'oof')]))
|
|
assert req.headers["Content-Type"] == semantics.HDR_FORM_URLENCODED
|
|
assert req.body
|
|
|
|
def test_get_path_components(self):
|
|
req = tutils.treq()
|
|
assert req.get_path_components()
|
|
# TODO: add meaningful assertions
|
|
|
|
def test_set_path_components(self):
|
|
req = tutils.treq()
|
|
req.set_path_components(["foo", "bar"])
|
|
# TODO: add meaningful assertions
|
|
|
|
def test_get_query(self):
|
|
req = tutils.treq()
|
|
assert req.get_query().lst == []
|
|
|
|
req.url = "http://localhost:80/foo?bar=42"
|
|
assert req.get_query().lst == [("bar", "42")]
|
|
|
|
def test_set_query(self):
|
|
req = tutils.treq()
|
|
req.set_query(odict.ODict([]))
|
|
|
|
def test_pretty_host(self):
|
|
r = tutils.treq()
|
|
assert r.pretty_host(True) == "address"
|
|
assert r.pretty_host(False) == "address"
|
|
r.headers["host"] = "other"
|
|
assert r.pretty_host(True) == "other"
|
|
assert r.pretty_host(False) == "address"
|
|
r.host = None
|
|
assert r.pretty_host(True) == "other"
|
|
assert r.pretty_host(False) is None
|
|
del r.headers["host"]
|
|
assert r.pretty_host(True) is None
|
|
assert r.pretty_host(False) is None
|
|
|
|
# Invalid IDNA
|
|
r.headers["host"] = ".disqus.com"
|
|
assert r.pretty_host(True) == ".disqus.com"
|
|
|
|
def test_pretty_url(self):
|
|
req = tutils.treq()
|
|
req.form_out = "authority"
|
|
assert req.pretty_url(True) == "address:22"
|
|
assert req.pretty_url(False) == "address:22"
|
|
|
|
req.form_out = "relative"
|
|
assert req.pretty_url(True) == "http://address:22/path"
|
|
assert req.pretty_url(False) == "http://address:22/path"
|
|
|
|
def test_get_cookies_none(self):
|
|
headers = http.Headers()
|
|
r = tutils.treq()
|
|
r.headers = headers
|
|
assert len(r.get_cookies()) == 0
|
|
|
|
def test_get_cookies_single(self):
|
|
r = tutils.treq()
|
|
r.headers = http.Headers(cookie="cookiename=cookievalue")
|
|
result = r.get_cookies()
|
|
assert len(result) == 1
|
|
assert result['cookiename'] == ['cookievalue']
|
|
|
|
def test_get_cookies_double(self):
|
|
r = tutils.treq()
|
|
r.headers = http.Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue")
|
|
result = r.get_cookies()
|
|
assert len(result) == 2
|
|
assert result['cookiename'] == ['cookievalue']
|
|
assert result['othercookiename'] == ['othercookievalue']
|
|
|
|
def test_get_cookies_withequalsign(self):
|
|
r = tutils.treq()
|
|
r.headers = http.Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue")
|
|
result = r.get_cookies()
|
|
assert len(result) == 2
|
|
assert result['cookiename'] == ['coo=kievalue']
|
|
assert result['othercookiename'] == ['othercookievalue']
|
|
|
|
def test_set_cookies(self):
|
|
r = tutils.treq()
|
|
r.headers = http.Headers(cookie="cookiename=cookievalue")
|
|
result = r.get_cookies()
|
|
result["cookiename"] = ["foo"]
|
|
r.set_cookies(result)
|
|
assert r.get_cookies()["cookiename"] == ["foo"]
|
|
|
|
def test_set_url(self):
|
|
r = tutils.treq_absolute()
|
|
r.url = "https://otheraddress:42/ORLY"
|
|
assert r.scheme == "https"
|
|
assert r.host == "otheraddress"
|
|
assert r.port == 42
|
|
assert r.path == "/ORLY"
|
|
|
|
try:
|
|
r.url = "//localhost:80/foo@bar"
|
|
assert False
|
|
except:
|
|
assert True
|
|
|
|
# def test_asterisk_form_in(self):
|
|
# f = tutils.tflow(req=None)
|
|
# protocol = mock_protocol("OPTIONS * HTTP/1.1")
|
|
# f.request = HTTPRequest.from_protocol(protocol)
|
|
#
|
|
# assert f.request.form_in == "relative"
|
|
# f.request.host = f.server_conn.address.host
|
|
# f.request.port = f.server_conn.address.port
|
|
# f.request.scheme = "http"
|
|
# assert protocol.assemble(f.request) == (
|
|
# "OPTIONS * HTTP/1.1\r\n"
|
|
# "Host: address:22\r\n"
|
|
# "Content-Length: 0\r\n\r\n")
|
|
#
|
|
# def test_relative_form_in(self):
|
|
# protocol = mock_protocol("GET /foo\xff HTTP/1.1")
|
|
# tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
|
|
#
|
|
# protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c")
|
|
# r = HTTPRequest.from_protocol(protocol)
|
|
# assert r.headers["Upgrade"] == ["h2c"]
|
|
#
|
|
# def test_expect_header(self):
|
|
# protocol = mock_protocol(
|
|
# "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar")
|
|
# r = HTTPRequest.from_protocol(protocol)
|
|
# assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
|
|
# assert r.content == "foo"
|
|
# assert protocol.tcp_handler.rfile.read(3) == "bar"
|
|
#
|
|
# def test_authority_form_in(self):
|
|
# protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1")
|
|
# tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
|
|
#
|
|
# protocol = mock_protocol("CONNECT address:22 HTTP/1.1")
|
|
# r = HTTPRequest.from_protocol(protocol)
|
|
# r.scheme, r.host, r.port = "http", "address", 22
|
|
# assert protocol.assemble(r) == (
|
|
# "CONNECT address:22 HTTP/1.1\r\n"
|
|
# "Host: address:22\r\n"
|
|
# "Content-Length: 0\r\n\r\n")
|
|
# assert r.pretty_url(False) == "address:22"
|
|
#
|
|
# def test_absolute_form_in(self):
|
|
# protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1")
|
|
# tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
|
|
#
|
|
# protocol = mock_protocol("GET http://address:22/ HTTP/1.1")
|
|
# r = HTTPRequest.from_protocol(protocol)
|
|
# assert protocol.assemble(r) == (
|
|
# "GET http://address:22/ HTTP/1.1\r\n"
|
|
# "Host: address:22\r\n"
|
|
# "Content-Length: 0\r\n\r\n")
|
|
#
|
|
# def test_http_options_relative_form_in(self):
|
|
# """
|
|
# Exercises fix for Issue #392.
|
|
# """
|
|
# protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1")
|
|
# r = HTTPRequest.from_protocol(protocol)
|
|
# r.host = 'address'
|
|
# r.port = 80
|
|
# r.scheme = "http"
|
|
# assert protocol.assemble(r) == (
|
|
# "OPTIONS /secret/resource HTTP/1.1\r\n"
|
|
# "Host: address\r\n"
|
|
# "Content-Length: 0\r\n\r\n")
|
|
#
|
|
# def test_http_options_absolute_form_in(self):
|
|
# protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1")
|
|
# r = HTTPRequest.from_protocol(protocol)
|
|
# r.host = 'address'
|
|
# r.port = 80
|
|
# r.scheme = "http"
|
|
# assert protocol.assemble(r) == (
|
|
# "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n"
|
|
# "Host: address\r\n"
|
|
# "Content-Length: 0\r\n\r\n")
|
|
|
|
class TestEmptyRequest(object):
|
|
def test_init(self):
|
|
req = semantics.EmptyRequest()
|
|
assert req
|
|
|
|
class TestResponse(object):
|
|
def test_headers(self):
|
|
tutils.raises(AssertionError, semantics.Response,
|
|
(1, 1),
|
|
200,
|
|
headers='foobar',
|
|
)
|
|
|
|
resp = semantics.Response(
|
|
(1, 1),
|
|
200,
|
|
)
|
|
assert isinstance(resp.headers, http.Headers)
|
|
|
|
def test_equal(self):
|
|
a = tutils.tresp()
|
|
b = tutils.tresp()
|
|
assert a == b
|
|
|
|
assert not a == 'foo'
|
|
assert not b == 'foo'
|
|
assert not 'foo' == a
|
|
assert not 'foo' == b
|
|
|
|
def test_repr(self):
|
|
r = tutils.tresp()
|
|
assert "unknown content type" in repr(r)
|
|
r.headers["content-type"] = "foo"
|
|
assert "foo" in repr(r)
|
|
assert repr(tutils.tresp(content=CONTENT_MISSING))
|
|
|
|
def test_get_cookies_none(self):
|
|
resp = tutils.tresp()
|
|
resp.headers = http.Headers()
|
|
assert not resp.get_cookies()
|
|
|
|
def test_get_cookies_simple(self):
|
|
resp = tutils.tresp()
|
|
resp.headers = http.Headers(set_cookie="cookiename=cookievalue")
|
|
result = resp.get_cookies()
|
|
assert len(result) == 1
|
|
assert "cookiename" in result
|
|
assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
|
|
|
|
def test_get_cookies_with_parameters(self):
|
|
resp = tutils.tresp()
|
|
resp.headers = http.Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly")
|
|
result = resp.get_cookies()
|
|
assert len(result) == 1
|
|
assert "cookiename" in result
|
|
assert result["cookiename"][0][0] == "cookievalue"
|
|
attrs = result["cookiename"][0][1]
|
|
assert len(attrs) == 4
|
|
assert attrs["domain"] == ["example.com"]
|
|
assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"]
|
|
assert attrs["path"] == ["/"]
|
|
assert attrs["httponly"] == [None]
|
|
|
|
def test_get_cookies_no_value(self):
|
|
resp = tutils.tresp()
|
|
resp.headers = http.Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/")
|
|
result = resp.get_cookies()
|
|
assert len(result) == 1
|
|
assert "cookiename" in result
|
|
assert result["cookiename"][0][0] == ""
|
|
assert len(result["cookiename"][0][1]) == 2
|
|
|
|
def test_get_cookies_twocookies(self):
|
|
resp = tutils.tresp()
|
|
resp.headers = http.Headers([
|
|
["Set-Cookie", "cookiename=cookievalue"],
|
|
["Set-Cookie", "othercookie=othervalue"]
|
|
])
|
|
result = resp.get_cookies()
|
|
assert len(result) == 2
|
|
assert "cookiename" in result
|
|
assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
|
|
assert "othercookie" in result
|
|
assert result["othercookie"][0] == ["othervalue", odict.ODict()]
|
|
|
|
def test_set_cookies(self):
|
|
resp = tutils.tresp()
|
|
v = resp.get_cookies()
|
|
v.add("foo", ["bar", odict.ODictCaseless()])
|
|
resp.set_cookies(v)
|
|
|
|
v = resp.get_cookies()
|
|
assert len(v) == 1
|
|
assert v["foo"] == [["bar", odict.ODictCaseless()]]
|
|
|
|
|
|
class TestHeaders(object):
|
|
def _2host(self):
|
|
return semantics.Headers(
|
|
[
|
|
["Host", "example.com"],
|
|
["host", "example.org"]
|
|
]
|
|
)
|
|
|
|
def test_init(self):
|
|
headers = semantics.Headers()
|
|
assert len(headers) == 0
|
|
|
|
headers = semantics.Headers([["Host", "example.com"]])
|
|
assert len(headers) == 1
|
|
assert headers["Host"] == "example.com"
|
|
|
|
headers = semantics.Headers(Host="example.com")
|
|
assert len(headers) == 1
|
|
assert headers["Host"] == "example.com"
|
|
|
|
headers = semantics.Headers(
|
|
[["Host", "invalid"]],
|
|
Host="example.com"
|
|
)
|
|
assert len(headers) == 1
|
|
assert headers["Host"] == "example.com"
|
|
|
|
headers = semantics.Headers(
|
|
[["Host", "invalid"], ["Accept", "text/plain"]],
|
|
Host="example.com"
|
|
)
|
|
assert len(headers) == 2
|
|
assert headers["Host"] == "example.com"
|
|
assert headers["Accept"] == "text/plain"
|
|
|
|
def test_getitem(self):
|
|
headers = semantics.Headers(Host="example.com")
|
|
assert headers["Host"] == "example.com"
|
|
assert headers["host"] == "example.com"
|
|
tutils.raises(KeyError, headers.__getitem__, "Accept")
|
|
|
|
headers = self._2host()
|
|
assert headers["Host"] == "example.com, example.org"
|
|
|
|
def test_str(self):
|
|
headers = semantics.Headers(Host="example.com")
|
|
assert str(headers) == "Host: example.com\r\n"
|
|
|
|
headers = semantics.Headers([
|
|
["Host", "example.com"],
|
|
["Accept", "text/plain"]
|
|
])
|
|
assert str(headers) == "Host: example.com\r\nAccept: text/plain\r\n"
|
|
|
|
def test_setitem(self):
|
|
headers = semantics.Headers()
|
|
headers["Host"] = "example.com"
|
|
assert "Host" in headers
|
|
assert "host" in headers
|
|
assert headers["Host"] == "example.com"
|
|
|
|
headers["host"] = "example.org"
|
|
assert "Host" in headers
|
|
assert "host" in headers
|
|
assert headers["Host"] == "example.org"
|
|
|
|
headers["accept"] = "text/plain"
|
|
assert len(headers) == 2
|
|
assert "Accept" in headers
|
|
assert "Host" in headers
|
|
|
|
headers = self._2host()
|
|
assert len(headers.fields) == 2
|
|
headers["Host"] = "example.com"
|
|
assert len(headers.fields) == 1
|
|
assert "Host" in headers
|
|
|
|
def test_delitem(self):
|
|
headers = semantics.Headers(Host="example.com")
|
|
assert len(headers) == 1
|
|
del headers["host"]
|
|
assert len(headers) == 0
|
|
try:
|
|
del headers["host"]
|
|
except KeyError:
|
|
assert True
|
|
else:
|
|
assert False
|
|
|
|
headers = self._2host()
|
|
del headers["Host"]
|
|
assert len(headers) == 0
|
|
|
|
def test_keys(self):
|
|
headers = semantics.Headers(Host="example.com")
|
|
assert len(headers.keys()) == 1
|
|
assert headers.keys()[0] == "Host"
|
|
|
|
headers = self._2host()
|
|
assert len(headers.keys()) == 1
|
|
assert headers.keys()[0] == "Host"
|
|
|
|
def test_eq_ne(self):
|
|
headers1 = semantics.Headers(Host="example.com")
|
|
headers2 = semantics.Headers(host="example.com")
|
|
assert not (headers1 == headers2)
|
|
assert headers1 != headers2
|
|
|
|
headers1 = semantics.Headers(Host="example.com")
|
|
headers2 = semantics.Headers(Host="example.com")
|
|
assert headers1 == headers2
|
|
assert not (headers1 != headers2)
|
|
|
|
assert headers1 != 42
|
|
|
|
def test_get_all(self):
|
|
headers = self._2host()
|
|
assert headers.get_all("host") == ["example.com", "example.org"]
|
|
assert headers.get_all("accept") == []
|
|
|
|
def test_set_all(self):
|
|
headers = semantics.Headers(Host="example.com")
|
|
headers.set_all("Accept", ["text/plain"])
|
|
assert len(headers) == 2
|
|
assert "accept" in headers
|
|
|
|
headers = self._2host()
|
|
headers.set_all("Host", ["example.org"])
|
|
assert headers["host"] == "example.org"
|
|
|
|
headers.set_all("Host", ["example.org", "example.net"])
|
|
assert headers["host"] == "example.org, example.net"
|
|
|
|
def test_state(self):
|
|
headers = self._2host()
|
|
assert len(headers.get_state()) == 2
|
|
assert headers == semantics.Headers.from_state(headers.get_state())
|
|
|
|
headers2 = semantics.Headers()
|
|
assert headers != headers2
|
|
headers2.load_state(headers.get_state())
|
|
assert headers == headers2
|