mitmproxy/test/http/http1/test_protocol.py

443 lines
13 KiB
Python
Raw Normal View History

2015-04-20 23:19:00 +00:00
import cStringIO
import textwrap
import binascii
2012-06-18 21:42:32 +00:00
2015-07-14 21:02:14 +00:00
from netlib import http, odict, tcp
2015-07-16 20:56:34 +00:00
from netlib.http.http1 import HTTP1Protocol
2015-07-14 21:02:14 +00:00
from ... import tutils, tservers
2012-06-23 03:07:42 +00:00
2015-07-16 20:56:34 +00:00
def mock_protocol(data='', chunked=False):
class TCPHandlerMock(object):
pass
tcp_handler = TCPHandlerMock()
tcp_handler.rfile = cStringIO.StringIO(data)
tcp_handler.wfile = cStringIO.StringIO()
return HTTP1Protocol(tcp_handler)
2012-06-18 21:42:32 +00:00
def test_has_chunked_encoding():
h = odict.ODictCaseless()
2015-07-16 20:56:34 +00:00
assert not HTTP1Protocol.has_chunked_encoding(h)
2012-06-18 21:42:32 +00:00
h["transfer-encoding"] = ["chunked"]
2015-07-16 20:56:34 +00:00
assert HTTP1Protocol.has_chunked_encoding(h)
2012-06-18 21:42:32 +00:00
def test_read_chunked():
2014-07-21 12:01:24 +00:00
h = odict.ODictCaseless()
h["transfer-encoding"] = ["chunked"]
2015-07-16 20:56:34 +00:00
data = "1\r\na\r\n0\r\n"
2015-04-20 23:19:00 +00:00
tutils.raises(
"malformed chunked body",
2015-07-16 20:56:34 +00:00
mock_protocol(data).read_http_body,
h, None, "GET", None, True
2015-04-20 23:19:00 +00:00
)
2012-06-18 21:42:32 +00:00
2015-07-16 20:56:34 +00:00
data = "1\r\na\r\n0\r\n\r\n"
assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == "a"
2012-06-23 03:07:42 +00:00
2015-07-16 20:56:34 +00:00
data = "\r\n\r\n1\r\na\r\n0\r\n\r\n"
assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == "a"
2012-06-18 21:42:32 +00:00
2015-07-16 20:56:34 +00:00
data = "\r\n"
2015-04-20 23:19:00 +00:00
tutils.raises(
"closed prematurely",
2015-07-16 20:56:34 +00:00
mock_protocol(data).read_http_body,
h, None, "GET", None, True
2015-04-20 23:19:00 +00:00
)
2012-06-18 21:42:32 +00:00
2015-07-16 20:56:34 +00:00
data = "1\r\nfoo"
2015-04-20 23:19:00 +00:00
tutils.raises(
"malformed chunked body",
2015-07-16 20:56:34 +00:00
mock_protocol(data).read_http_body,
h, None, "GET", None, True
2015-04-20 23:19:00 +00:00
)
2012-06-18 21:42:32 +00:00
2015-07-16 20:56:34 +00:00
data = "foo\r\nfoo"
2015-04-20 23:19:00 +00:00
tutils.raises(
2015-07-16 20:56:34 +00:00
http.HttpError,
mock_protocol(data).read_http_body,
h, None, "GET", None, True
2015-04-20 23:19:00 +00:00
)
2012-06-23 03:07:42 +00:00
2015-07-16 20:56:34 +00:00
data = "5\r\naaaaa\r\n0\r\n\r\n"
tutils.raises("too large", mock_protocol(data).read_http_body, h, 2, "GET", None, True)
2013-11-19 03:11:24 +00:00
def test_connection_close():
2012-06-18 21:42:32 +00:00
h = odict.ODictCaseless()
2015-07-16 20:56:34 +00:00
assert HTTP1Protocol.connection_close((1, 0), h)
assert not HTTP1Protocol.connection_close((1, 1), h)
2012-06-18 21:42:32 +00:00
h["connection"] = ["keep-alive"]
2015-07-16 20:56:34 +00:00
assert not HTTP1Protocol.connection_close((1, 1), h)
2012-06-18 21:42:32 +00:00
2012-06-23 03:07:42 +00:00
h["connection"] = ["close"]
2015-07-16 20:56:34 +00:00
assert HTTP1Protocol.connection_close((1, 1), h)
2012-06-23 03:07:42 +00:00
def test_get_header_tokens():
h = odict.ODictCaseless()
assert http.get_header_tokens(h, "foo") == []
h["foo"] = ["bar"]
assert http.get_header_tokens(h, "foo") == ["bar"]
h["foo"] = ["bar, voing"]
assert http.get_header_tokens(h, "foo") == ["bar", "voing"]
h["foo"] = ["bar, voing", "oink"]
assert http.get_header_tokens(h, "foo") == ["bar", "voing", "oink"]
2012-06-23 03:07:42 +00:00
def test_read_http_body_request():
h = odict.ODictCaseless()
2015-07-16 20:56:34 +00:00
data = "testing"
assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == ""
2012-06-23 03:07:42 +00:00
2013-12-15 05:43:54 +00:00
def test_read_http_body_response():
h = odict.ODictCaseless()
2015-07-16 20:56:34 +00:00
data = "testing"
assert mock_protocol(data, chunked=True).read_http_body(h, None, "GET", 200, False) == "testing"
2012-06-18 21:42:32 +00:00
2012-06-18 21:42:32 +00:00
def test_read_http_body():
2013-12-15 05:43:54 +00:00
# test default case
2012-06-23 03:07:42 +00:00
h = odict.ODictCaseless()
2013-12-15 05:43:54 +00:00
h["content-length"] = [7]
2015-07-16 20:56:34 +00:00
data = "testing"
assert mock_protocol(data).read_http_body(h, None, "GET", 200, False) == "testing"
2012-06-18 21:42:32 +00:00
2013-12-15 05:43:54 +00:00
# test content length: invalid header
2012-06-18 21:42:32 +00:00
h["content-length"] = ["foo"]
2015-07-16 20:56:34 +00:00
data = "testing"
2015-04-20 23:19:00 +00:00
tutils.raises(
2015-07-16 20:56:34 +00:00
http.HttpError,
mock_protocol(data).read_http_body,
h, None, "GET", 200, False
2015-04-20 23:19:00 +00:00
)
2012-06-18 21:42:32 +00:00
2013-12-15 05:43:54 +00:00
# test content length: invalid header #2
h["content-length"] = [-1]
2015-07-16 20:56:34 +00:00
data = "testing"
2015-04-20 23:19:00 +00:00
tutils.raises(
2015-07-16 20:56:34 +00:00
http.HttpError,
mock_protocol(data).read_http_body,
h, None, "GET", 200, False
2015-04-20 23:19:00 +00:00
)
2013-12-15 05:43:54 +00:00
# test content length: content length > actual content
2012-06-18 21:42:32 +00:00
h["content-length"] = [5]
2015-07-16 20:56:34 +00:00
data = "testing"
2015-04-20 23:19:00 +00:00
tutils.raises(
2015-07-16 20:56:34 +00:00
http.HttpError,
mock_protocol(data).read_http_body,
h, 4, "GET", 200, False
2015-04-20 23:19:00 +00:00
)
2013-12-15 05:43:54 +00:00
# test content length: content length < actual content
2015-07-16 20:56:34 +00:00
data = "testing"
assert len(mock_protocol(data).read_http_body(h, None, "GET", 200, False)) == 5
2012-06-18 21:42:32 +00:00
2013-12-15 05:43:54 +00:00
# test no content length: limit > actual content
2012-06-23 03:07:42 +00:00
h = odict.ODictCaseless()
2015-07-16 20:56:34 +00:00
data = "testing"
assert len(mock_protocol(data, chunked=True).read_http_body(h, 100, "GET", 200, False)) == 7
2013-12-15 05:43:54 +00:00
# test no content length: limit < actual content
2015-07-16 20:56:34 +00:00
data = "testing"
2015-04-20 23:19:00 +00:00
tutils.raises(
2015-07-16 20:56:34 +00:00
http.HttpError,
mock_protocol(data, chunked=True).read_http_body,
h, 4, "GET", 200, False
2015-04-20 23:19:00 +00:00
)
2012-06-23 03:07:42 +00:00
2013-12-15 05:43:54 +00:00
# test chunked
2012-06-23 03:07:42 +00:00
h = odict.ODictCaseless()
h["transfer-encoding"] = ["chunked"]
2015-07-16 20:56:34 +00:00
data = "5\r\naaaaa\r\n0\r\n\r\n"
assert mock_protocol(data, chunked=True).read_http_body(h, 100, "GET", 200, False) == "aaaaa"
2012-06-23 03:07:42 +00:00
def test_expected_http_body_size():
# gibber in the content-length field
h = odict.ODictCaseless()
h["content-length"] = ["foo"]
2015-07-16 20:56:34 +00:00
assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) is None
# negative number in the content-length field
h = odict.ODictCaseless()
h["content-length"] = ["-7"]
2015-07-16 20:56:34 +00:00
assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) is None
# explicit length
h = odict.ODictCaseless()
h["content-length"] = ["5"]
2015-07-16 20:56:34 +00:00
assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) == 5
# no length
h = odict.ODictCaseless()
2015-07-16 20:56:34 +00:00
assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) == -1
# no length request
h = odict.ODictCaseless()
2015-07-16 20:56:34 +00:00
assert HTTP1Protocol.expected_http_body_size(h, True, "GET", None) == 0
2012-06-18 21:42:32 +00:00
2012-06-18 21:42:32 +00:00
def test_parse_http_protocol():
2015-07-16 20:56:34 +00:00
assert HTTP1Protocol.parse_http_protocol("HTTP/1.1") == (1, 1)
assert HTTP1Protocol.parse_http_protocol("HTTP/0.0") == (0, 0)
assert not HTTP1Protocol.parse_http_protocol("HTTP/a.1")
assert not HTTP1Protocol.parse_http_protocol("HTTP/1.a")
assert not HTTP1Protocol.parse_http_protocol("foo/0.0")
assert not HTTP1Protocol.parse_http_protocol("HTTP/x")
2012-06-18 21:42:32 +00:00
def test_parse_init_connect():
2015-07-16 20:56:34 +00:00
assert HTTP1Protocol.parse_init_connect("CONNECT host.com:443 HTTP/1.0")
assert not HTTP1Protocol.parse_init_connect("C\xfeONNECT host.com:443 HTTP/1.0")
assert not HTTP1Protocol.parse_init_connect("CONNECT \0host.com:443 HTTP/1.0")
assert not HTTP1Protocol.parse_init_connect("CONNECT host.com:444444 HTTP/1.0")
assert not HTTP1Protocol.parse_init_connect("bogus")
assert not HTTP1Protocol.parse_init_connect("GET host.com:443 HTTP/1.0")
assert not HTTP1Protocol.parse_init_connect("CONNECT host.com443 HTTP/1.0")
assert not HTTP1Protocol.parse_init_connect("CONNECT host.com:443 foo/1.0")
assert not HTTP1Protocol.parse_init_connect("CONNECT host.com:foo HTTP/1.0")
2012-06-18 21:42:32 +00:00
2013-05-05 01:49:20 +00:00
def test_parse_init_proxy():
2012-06-18 21:42:32 +00:00
u = "GET http://foo.com:8888/test HTTP/1.1"
2015-07-16 20:56:34 +00:00
m, s, h, po, pa, httpversion = HTTP1Protocol.parse_init_proxy(u)
2012-06-18 21:42:32 +00:00
assert m == "GET"
assert s == "http"
assert h == "foo.com"
assert po == 8888
assert pa == "/test"
assert httpversion == (1, 1)
2013-03-03 09:13:23 +00:00
u = "G\xfeET http://foo.com:8888/test HTTP/1.1"
2015-07-16 20:56:34 +00:00
assert not HTTP1Protocol.parse_init_proxy(u)
2013-03-03 09:13:23 +00:00
2015-07-16 20:56:34 +00:00
assert not HTTP1Protocol.parse_init_proxy("invalid")
assert not HTTP1Protocol.parse_init_proxy("GET invalid HTTP/1.1")
assert not HTTP1Protocol.parse_init_proxy("GET http://foo.com:8888/test foo/1.1")
2012-06-18 21:42:32 +00:00
def test_parse_init_http():
u = "GET /test HTTP/1.1"
2015-07-16 20:56:34 +00:00
m, u, httpversion = HTTP1Protocol.parse_init_http(u)
2012-06-18 21:42:32 +00:00
assert m == "GET"
assert u == "/test"
assert httpversion == (1, 1)
2013-03-03 09:13:23 +00:00
u = "G\xfeET /test HTTP/1.1"
2015-07-16 20:56:34 +00:00
assert not HTTP1Protocol.parse_init_http(u)
2013-03-03 09:13:23 +00:00
2015-07-16 20:56:34 +00:00
assert not HTTP1Protocol.parse_init_http("invalid")
assert not HTTP1Protocol.parse_init_http("GET invalid HTTP/1.1")
assert not HTTP1Protocol.parse_init_http("GET /test foo/1.1")
assert not HTTP1Protocol.parse_init_http("GET /test\xc0 HTTP/1.1")
2012-06-18 21:42:32 +00:00
2012-06-18 21:42:32 +00:00
class TestReadHeaders:
def _read(self, data, verbatim=False):
if not verbatim:
data = textwrap.dedent(data)
data = data.strip()
2015-07-16 20:56:34 +00:00
return mock_protocol(data).read_headers()
2012-06-18 21:42:32 +00:00
def test_read_simple(self):
data = """
Header: one
Header2: two
\r\n
"""
h = self._read(data)
assert h.lst == [["Header", "one"], ["Header2", "two"]]
2012-06-18 21:42:32 +00:00
def test_read_multi(self):
data = """
Header: one
Header: two
\r\n
"""
h = self._read(data)
assert h.lst == [["Header", "one"], ["Header", "two"]]
2012-06-18 21:42:32 +00:00
def test_read_continued(self):
data = """
Header: one
\ttwo
Header2: three
\r\n
"""
h = self._read(data)
assert h.lst == [["Header", "one\r\n two"], ["Header2", "three"]]
2012-06-18 21:42:32 +00:00
def test_read_continued_err(self):
data = "\tfoo: bar\r\n"
assert self._read(data, True) is None
def test_read_err(self):
data = """
foo
"""
assert self._read(data) is None
2012-06-18 21:42:32 +00:00
2013-12-15 05:43:54 +00:00
class NoContentLengthHTTPHandler(tcp.BaseHandler):
2013-12-15 05:43:54 +00:00
def handle(self):
self.wfile.write("HTTP/1.1 200 OK\r\n\r\nbar\r\n\r\n")
self.wfile.flush()
2015-06-22 02:52:23 +00:00
class TestReadResponseNoContentLength(tservers.ServerTestBase):
2013-12-15 05:43:54 +00:00
handler = NoContentLengthHTTPHandler
def test_no_content_length(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
2013-12-15 05:43:54 +00:00
c.connect()
2015-07-16 20:56:34 +00:00
resp = HTTP1Protocol(c).read_response("GET", None)
2015-07-19 16:17:30 +00:00
assert resp.body == "bar\r\n\r\n"
2013-12-15 05:43:54 +00:00
2012-06-24 10:45:40 +00:00
def test_read_response():
def tst(data, method, limit, include_body=True):
2012-06-24 10:45:40 +00:00
data = textwrap.dedent(data)
2015-07-16 20:56:34 +00:00
return mock_protocol(data).read_response(
method, limit, include_body=include_body
)
2012-06-24 10:45:40 +00:00
tutils.raises("server disconnect", tst, "", "GET", None)
2012-06-24 10:45:40 +00:00
tutils.raises("invalid server response", tst, "foo", "GET", None)
data = """
HTTP/1.1 200 OK
"""
2015-07-14 21:02:14 +00:00
assert tst(data, "GET", None) == http.Response(
2015-04-20 23:19:00 +00:00
(1, 1), 200, 'OK', odict.ODictCaseless(), ''
)
2012-06-24 10:45:40 +00:00
data = """
HTTP/1.1 200
"""
2015-07-14 21:02:14 +00:00
assert tst(data, "GET", None) == http.Response(
2015-04-20 23:19:00 +00:00
(1, 1), 200, '', odict.ODictCaseless(), ''
)
2012-06-24 10:45:40 +00:00
data = """
HTTP/x 200 OK
"""
tutils.raises("invalid http version", tst, data, "GET", None)
data = """
HTTP/1.1 xx OK
"""
tutils.raises("invalid server response", tst, data, "GET", None)
data = """
HTTP/1.1 100 CONTINUE
HTTP/1.1 200 OK
"""
2015-07-14 21:02:14 +00:00
assert tst(data, "GET", None) == http.Response(
2015-04-20 23:19:00 +00:00
(1, 1), 100, 'CONTINUE', odict.ODictCaseless(), ''
)
2012-06-24 10:45:40 +00:00
data = """
HTTP/1.1 200 OK
2012-06-24 11:16:06 +00:00
Content-Length: 3
2012-06-24 10:45:40 +00:00
foo
"""
2015-07-19 16:17:30 +00:00
assert tst(data, "GET", None).body == 'foo'
assert tst(data, "HEAD", None).body == ''
2012-06-24 10:45:40 +00:00
data = """
HTTP/1.1 200 OK
\tContent-Length: 3
foo
"""
tutils.raises("invalid headers", tst, data, "GET", None)
data = """
HTTP/1.1 200 OK
Content-Length: 3
foo
"""
2015-07-19 16:17:30 +00:00
assert tst(data, "GET", None, include_body=False).body is None
2012-06-24 10:45:40 +00:00
def test_get_request_line():
2015-07-16 20:56:34 +00:00
data = "\nfoo"
p = mock_protocol(data)
assert p.get_request_line() == "foo"
assert not p.get_request_line()
class TestReadRequest():
def tst(self, data, **kwargs):
2015-07-16 20:56:34 +00:00
return mock_protocol(data).read_request(**kwargs)
def test_invalid(self):
tutils.raises(
"bad http request",
self.tst,
"xxx"
)
tutils.raises(
"bad http request line",
self.tst,
"get /\xff HTTP/1.1"
)
tutils.raises(
"invalid headers",
self.tst,
"get / HTTP/1.1\r\nfoo"
)
2015-04-29 20:41:13 +00:00
tutils.raises(
tcp.NetLibDisconnect,
self.tst,
"\r\n"
)
def test_asterisk_form_in(self):
v = self.tst("OPTIONS * HTTP/1.1")
assert v.form_in == "relative"
assert v.method == "OPTIONS"
def test_absolute_form_in(self):
tutils.raises(
"Bad HTTP request line",
self.tst,
"GET oops-no-protocol.com HTTP/1.1"
)
v = self.tst("GET http://address:22/ HTTP/1.1")
assert v.form_in == "absolute"
assert v.port == 22
assert v.host == "address"
assert v.scheme == "http"
def test_connect(self):
tutils.raises(
"Bad HTTP request line",
self.tst,
"CONNECT oops-no-port.com HTTP/1.1"
)
v = self.tst("CONNECT foo.com:443 HTTP/1.1")
assert v.form_in == "authority"
assert v.method == "CONNECT"
assert v.port == 443
assert v.host == "foo.com"
def test_expect(self):
2015-07-16 20:56:34 +00:00
data = "".join(
"GET / HTTP/1.1\r\n"
"Content-Length: 3\r\n"
"Expect: 100-continue\r\n\r\n"
2015-07-16 20:56:34 +00:00
"foobar"
)
2015-07-16 20:56:34 +00:00
p = mock_protocol(data)
v = p.read_request()
assert p.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
2015-07-19 16:17:30 +00:00
assert v.body == "foo"
2015-07-16 20:56:34 +00:00
assert p.tcp_handler.rfile.read(3) == "bar"