mitmproxy/test/test_protocol_http.py
2015-07-30 13:53:17 +02:00

341 lines
11 KiB
Python

import cStringIO
from cStringIO import StringIO
from mock import MagicMock
from libmproxy.protocol.http import *
from netlib import odict
from netlib.http import http1
from netlib.http.semantics import CONTENT_MISSING
import tutils
import tservers
def mock_protocol(data='', chunked=False):
rfile = cStringIO.StringIO(data)
wfile = cStringIO.StringIO()
return http1.HTTP1Protocol(rfile=rfile, wfile=wfile)
def test_HttpAuthenticationError():
x = HttpAuthenticationError({"foo": "bar"})
assert str(x)
assert "foo" in x.headers
# TODO: move test to netlib
# def test_stripped_chunked_encoding_no_content():
# """
# https://github.com/mitmproxy/mitmproxy/issues/186
# """
# r = tutils.tresp(content="")
# r.headers["Transfer-Encoding"] = ["chunked"]
# assert "Content-Length" in r._assemble_headers()
#
# r = tutils.treq(content="")
# r.headers["Transfer-Encoding"] = ["chunked"]
# assert "Content-Length" in r._assemble_headers()
#
class TestHTTPRequest:
def test_asterisk_form_in(self):
f = tutils.tflow(req=None)
protocol = mock_protocol("OPTIONS * HTTP/1.1")
f.request = HTTPRequest.from_protocol(protocol)
assert f.request.form_in == "relative"
f.request.host = f.server_conn.address.host
f.request.port = f.server_conn.address.port
f.request.scheme = "http"
assert protocol.assemble(f.request) == (
"OPTIONS * HTTP/1.1\r\n"
"Host: address:22\r\n"
"Content-Length: 0\r\n\r\n")
def test_relative_form_in(self):
protocol = mock_protocol("GET /foo\xff HTTP/1.1")
tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c")
r = HTTPRequest.from_protocol(protocol)
assert r.headers["Upgrade"] == ["h2c"]
def test_expect_header(self):
protocol = mock_protocol(
"GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar")
r = HTTPRequest.from_protocol(protocol)
assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
assert r.content == "foo"
assert protocol.tcp_handler.rfile.read(3) == "bar"
def test_authority_form_in(self):
protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1")
tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
protocol = mock_protocol("CONNECT address:22 HTTP/1.1")
r = HTTPRequest.from_protocol(protocol)
r.scheme, r.host, r.port = "http", "address", 22
assert protocol.assemble(r) == (
"CONNECT address:22 HTTP/1.1\r\n"
"Host: address:22\r\n"
"Content-Length: 0\r\n\r\n")
assert r.pretty_url(False) == "address:22"
def test_absolute_form_in(self):
protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1")
tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
protocol = mock_protocol("GET http://address:22/ HTTP/1.1")
r = HTTPRequest.from_protocol(protocol)
assert protocol.assemble(r) == (
"GET http://address:22/ HTTP/1.1\r\n"
"Host: address:22\r\n"
"Content-Length: 0\r\n\r\n")
def test_http_options_relative_form_in(self):
"""
Exercises fix for Issue #392.
"""
protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1")
r = HTTPRequest.from_protocol(protocol)
r.host = 'address'
r.port = 80
r.scheme = "http"
assert protocol.assemble(r) == (
"OPTIONS /secret/resource HTTP/1.1\r\n"
"Host: address\r\n"
"Content-Length: 0\r\n\r\n")
def test_http_options_absolute_form_in(self):
protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1")
r = HTTPRequest.from_protocol(protocol)
r.host = 'address'
r.port = 80
r.scheme = "http"
assert protocol.assemble(r) == (
"OPTIONS http://address:80/secret/resource HTTP/1.1\r\n"
"Host: address\r\n"
"Content-Length: 0\r\n\r\n")
def test_set_url(self):
r = tutils.treq_absolute()
r.url = "https://otheraddress:42/ORLY"
assert r.scheme == "https"
assert r.host == "otheraddress"
assert r.port == 42
assert r.path == "/ORLY"
def test_repr(self):
r = tutils.treq()
assert repr(r)
def test_pretty_host(self):
r = tutils.treq()
assert r.pretty_host(True) == "address"
assert r.pretty_host(False) == "address"
r.headers["host"] = ["other"]
assert r.pretty_host(True) == "other"
assert r.pretty_host(False) == "address"
r.host = None
assert r.pretty_host(True) == "other"
assert r.pretty_host(False) is None
del r.headers["host"]
assert r.pretty_host(True) is None
assert r.pretty_host(False) is None
# Invalid IDNA
r.headers["host"] = [".disqus.com"]
assert r.pretty_host(True) == ".disqus.com"
def test_get_form_for_urlencoded(self):
r = tutils.treq()
r.headers.add("content-type", "application/x-www-form-urlencoded")
r.get_form_urlencoded = MagicMock()
r.get_form()
assert r.get_form_urlencoded.called
def test_get_form_for_multipart(self):
r = tutils.treq()
r.headers.add("content-type", "multipart/form-data")
r.get_form_multipart = MagicMock()
r.get_form()
assert r.get_form_multipart.called
def test_get_cookies_none(self):
h = odict.ODictCaseless()
r = tutils.treq()
r.headers = h
assert len(r.get_cookies()) == 0
def test_get_cookies_single(self):
h = odict.ODictCaseless()
h["Cookie"] = ["cookiename=cookievalue"]
r = tutils.treq()
r.headers = h
result = r.get_cookies()
assert len(result) == 1
assert result['cookiename'] == ['cookievalue']
def test_get_cookies_double(self):
h = odict.ODictCaseless()
h["Cookie"] = [
"cookiename=cookievalue;othercookiename=othercookievalue"
]
r = tutils.treq()
r.headers = h
result = r.get_cookies()
assert len(result) == 2
assert result['cookiename'] == ['cookievalue']
assert result['othercookiename'] == ['othercookievalue']
def test_get_cookies_withequalsign(self):
h = odict.ODictCaseless()
h["Cookie"] = [
"cookiename=coo=kievalue;othercookiename=othercookievalue"
]
r = tutils.treq()
r.headers = h
result = r.get_cookies()
assert len(result) == 2
assert result['cookiename'] == ['coo=kievalue']
assert result['othercookiename'] == ['othercookievalue']
def test_set_cookies(self):
h = odict.ODictCaseless()
h["Cookie"] = ["cookiename=cookievalue"]
r = tutils.treq()
r.headers = h
result = r.get_cookies()
result["cookiename"] = ["foo"]
r.set_cookies(result)
assert r.get_cookies()["cookiename"] == ["foo"]
class TestHTTPResponse:
def test_read_from_stringio(self):
s = "HTTP/1.1 200 OK\r\n" \
"Content-Length: 7\r\n" \
"\r\n"\
"content\r\n" \
"HTTP/1.1 204 OK\r\n" \
"\r\n"
protocol = mock_protocol(s)
r = HTTPResponse.from_protocol(protocol, "GET")
assert r.status_code == 200
assert r.content == "content"
assert HTTPResponse.from_protocol(protocol, "GET").status_code == 204
protocol = mock_protocol(s)
# HEAD must not have content by spec. We should leave it on the pipe.
r = HTTPResponse.from_protocol(protocol, "HEAD")
assert r.status_code == 200
assert r.content == ""
tutils.raises(
"Invalid server response: 'content",
HTTPResponse.from_protocol, protocol, "GET"
)
def test_repr(self):
r = tutils.tresp()
assert "unknown content type" in repr(r)
r.headers["content-type"] = ["foo"]
assert "foo" in repr(r)
assert repr(tutils.tresp(content=CONTENT_MISSING))
def test_get_cookies_none(self):
h = odict.ODictCaseless()
resp = tutils.tresp()
resp.headers = h
assert not resp.get_cookies()
def test_get_cookies_simple(self):
h = odict.ODictCaseless()
h["Set-Cookie"] = ["cookiename=cookievalue"]
resp = tutils.tresp()
resp.headers = h
result = resp.get_cookies()
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
def test_get_cookies_with_parameters(self):
h = odict.ODictCaseless()
h["Set-Cookie"] = [
"cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly"]
resp = tutils.tresp()
resp.headers = h
result = resp.get_cookies()
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0][0] == "cookievalue"
attrs = result["cookiename"][0][1]
assert len(attrs) == 4
assert attrs["domain"] == ["example.com"]
assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"]
assert attrs["path"] == ["/"]
assert attrs["httponly"] == [None]
def test_get_cookies_no_value(self):
h = odict.ODictCaseless()
h["Set-Cookie"] = [
"cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/"
]
resp = tutils.tresp()
resp.headers = h
result = resp.get_cookies()
assert len(result) == 1
assert "cookiename" in result
assert result["cookiename"][0][0] == ""
assert len(result["cookiename"][0][1]) == 2
def test_get_cookies_twocookies(self):
h = odict.ODictCaseless()
h["Set-Cookie"] = ["cookiename=cookievalue", "othercookie=othervalue"]
resp = tutils.tresp()
resp.headers = h
result = resp.get_cookies()
assert len(result) == 2
assert "cookiename" in result
assert result["cookiename"][0] == ["cookievalue", odict.ODict()]
assert "othercookie" in result
assert result["othercookie"][0] == ["othervalue", odict.ODict()]
def test_set_cookies(self):
resp = tutils.tresp()
v = resp.get_cookies()
v.add("foo", ["bar", odict.ODictCaseless()])
resp.set_cookies(v)
v = resp.get_cookies()
assert len(v) == 1
assert v["foo"] == [["bar", odict.ODictCaseless()]]
class TestHTTPFlow(object):
def test_repr(self):
f = tutils.tflow(resp=True, err=True)
assert repr(f)
class TestInvalidRequests(tservers.HTTPProxTest):
ssl = True
def test_double_connect(self):
p = self.pathoc()
r = p.request("connect:'%s:%s'" % ("127.0.0.1", self.server2.port))
assert r.status_code == 400
assert "Must not CONNECT on already encrypted connection" in r.body
def test_relative_request(self):
p = self.pathoc_raw()
p.connect()
r = p.request("get:/p/200")
assert r.status_code == 400
assert "Invalid HTTP request form" in r.body