mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-30 19:30:57 +00:00
02acfb1242
- Don't crash when passed an empty string. This translated into an actual core crash, discovered while fuzzing with afl. - Taking a slice of length one out of bytes returns an integer, so the check for trailing period in this function never worked on Python3. - Add unit tests.
375 lines
11 KiB
Python
375 lines
11 KiB
Python
from __future__ import absolute_import, print_function, division
|
|
|
|
from io import BytesIO
|
|
from mock import Mock
|
|
import pytest
|
|
|
|
from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect, TcpDisconnect
|
|
from netlib.http import Headers
|
|
from netlib.http.http1.read import (
|
|
read_request, read_response, read_request_head,
|
|
read_response_head, read_body, connection_close, expected_http_body_size, _get_first_line,
|
|
_read_request_line, _parse_authority_form, _read_response_line, _check_http_version,
|
|
_read_headers, _read_chunked, get_header_tokens
|
|
)
|
|
from netlib.tutils import treq, tresp, raises
|
|
from netlib import exceptions
|
|
|
|
|
|
def test_get_header_tokens():
|
|
headers = Headers()
|
|
assert get_header_tokens(headers, "foo") == []
|
|
headers["foo"] = "bar"
|
|
assert get_header_tokens(headers, "foo") == ["bar"]
|
|
headers["foo"] = "bar, voing"
|
|
assert get_header_tokens(headers, "foo") == ["bar", "voing"]
|
|
headers.set_all("foo", ["bar, voing", "oink"])
|
|
assert get_header_tokens(headers, "foo") == ["bar", "voing", "oink"]
|
|
|
|
|
|
@pytest.mark.parametrize("input", [
|
|
b"GET / HTTP/1.1\r\n\r\nskip",
|
|
b"GET / HTTP/1.1\r\n\r\nskip",
|
|
b"GET / HTTP/1.1\r\n\r\nskip",
|
|
b"GET / HTTP/1.1 \r\n\r\nskip",
|
|
])
|
|
def test_read_request(input):
|
|
rfile = BytesIO(input)
|
|
r = read_request(rfile)
|
|
assert r.method == "GET"
|
|
assert r.content == b""
|
|
assert r.http_version == "HTTP/1.1"
|
|
assert r.timestamp_end
|
|
assert rfile.read() == b"skip"
|
|
|
|
|
|
@pytest.mark.parametrize("input", [
|
|
b"CONNECT :0 0",
|
|
])
|
|
def test_read_request_error(input):
|
|
rfile = BytesIO(input)
|
|
raises(exceptions.HttpException, read_request, rfile)
|
|
|
|
|
|
def test_read_request_head():
|
|
rfile = BytesIO(
|
|
b"GET / HTTP/1.1\r\n"
|
|
b"Content-Length: 4\r\n"
|
|
b"\r\n"
|
|
b"skip"
|
|
)
|
|
rfile.reset_timestamps = Mock()
|
|
rfile.first_byte_timestamp = 42
|
|
r = read_request_head(rfile)
|
|
assert r.method == "GET"
|
|
assert r.headers["Content-Length"] == "4"
|
|
assert r.content is None
|
|
assert rfile.reset_timestamps.called
|
|
assert r.timestamp_start == 42
|
|
assert rfile.read() == b"skip"
|
|
|
|
|
|
@pytest.mark.parametrize("input", [
|
|
b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody",
|
|
b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody",
|
|
b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody",
|
|
b"HTTP/1.1 418 I'm a teapot \r\n\r\nbody",
|
|
])
|
|
def test_read_response(input):
|
|
req = treq()
|
|
rfile = BytesIO(input)
|
|
r = read_response(rfile, req)
|
|
assert r.http_version == "HTTP/1.1"
|
|
assert r.status_code == 418
|
|
assert r.reason == "I'm a teapot"
|
|
assert r.content == b"body"
|
|
assert r.timestamp_end
|
|
|
|
|
|
def test_read_response_head():
|
|
rfile = BytesIO(
|
|
b"HTTP/1.1 418 I'm a teapot\r\n"
|
|
b"Content-Length: 4\r\n"
|
|
b"\r\n"
|
|
b"skip"
|
|
)
|
|
rfile.reset_timestamps = Mock()
|
|
rfile.first_byte_timestamp = 42
|
|
r = read_response_head(rfile)
|
|
assert r.status_code == 418
|
|
assert r.headers["Content-Length"] == "4"
|
|
assert r.content is None
|
|
assert rfile.reset_timestamps.called
|
|
assert r.timestamp_start == 42
|
|
assert rfile.read() == b"skip"
|
|
|
|
|
|
class TestReadBody(object):
|
|
def test_chunked(self):
|
|
rfile = BytesIO(b"3\r\nfoo\r\n0\r\n\r\nbar")
|
|
body = b"".join(read_body(rfile, None))
|
|
assert body == b"foo"
|
|
assert rfile.read() == b"bar"
|
|
|
|
def test_known_size(self):
|
|
rfile = BytesIO(b"foobar")
|
|
body = b"".join(read_body(rfile, 3))
|
|
assert body == b"foo"
|
|
assert rfile.read() == b"bar"
|
|
|
|
def test_known_size_limit(self):
|
|
rfile = BytesIO(b"foobar")
|
|
with raises(HttpException):
|
|
b"".join(read_body(rfile, 3, 2))
|
|
|
|
def test_known_size_too_short(self):
|
|
rfile = BytesIO(b"foo")
|
|
with raises(HttpException):
|
|
b"".join(read_body(rfile, 6))
|
|
|
|
def test_unknown_size(self):
|
|
rfile = BytesIO(b"foobar")
|
|
body = b"".join(read_body(rfile, -1))
|
|
assert body == b"foobar"
|
|
|
|
def test_unknown_size_limit(self):
|
|
rfile = BytesIO(b"foobar")
|
|
with raises(HttpException):
|
|
b"".join(read_body(rfile, -1, 3))
|
|
|
|
def test_max_chunk_size(self):
|
|
rfile = BytesIO(b"123456")
|
|
assert list(read_body(rfile, -1, max_chunk_size=None)) == [b"123456"]
|
|
rfile = BytesIO(b"123456")
|
|
assert list(read_body(rfile, -1, max_chunk_size=1)) == [b"1", b"2", b"3", b"4", b"5", b"6"]
|
|
|
|
|
|
def test_connection_close():
|
|
headers = Headers()
|
|
assert connection_close(b"HTTP/1.0", headers)
|
|
assert not connection_close(b"HTTP/1.1", headers)
|
|
|
|
headers["connection"] = "keep-alive"
|
|
assert not connection_close(b"HTTP/1.1", headers)
|
|
|
|
headers["connection"] = "close"
|
|
assert connection_close(b"HTTP/1.1", headers)
|
|
|
|
headers["connection"] = "foobar"
|
|
assert connection_close(b"HTTP/1.0", headers)
|
|
assert not connection_close(b"HTTP/1.1", headers)
|
|
|
|
|
|
def test_expected_http_body_size():
|
|
# Expect: 100-continue
|
|
assert expected_http_body_size(
|
|
treq(headers=Headers(expect="100-continue", content_length="42"))
|
|
) == 0
|
|
|
|
# http://tools.ietf.org/html/rfc7230#section-3.3
|
|
assert expected_http_body_size(
|
|
treq(method=b"HEAD"),
|
|
tresp(headers=Headers(content_length="42"))
|
|
) == 0
|
|
assert expected_http_body_size(
|
|
treq(method=b"CONNECT"),
|
|
tresp()
|
|
) == 0
|
|
for code in (100, 204, 304):
|
|
assert expected_http_body_size(
|
|
treq(),
|
|
tresp(status_code=code)
|
|
) == 0
|
|
|
|
# chunked
|
|
assert expected_http_body_size(
|
|
treq(headers=Headers(transfer_encoding="chunked")),
|
|
) is None
|
|
|
|
# explicit length
|
|
for val in (b"foo", b"-7"):
|
|
with raises(HttpSyntaxException):
|
|
expected_http_body_size(
|
|
treq(headers=Headers(content_length=val))
|
|
)
|
|
assert expected_http_body_size(
|
|
treq(headers=Headers(content_length="42"))
|
|
) == 42
|
|
|
|
# no length
|
|
assert expected_http_body_size(
|
|
treq(headers=Headers())
|
|
) == 0
|
|
assert expected_http_body_size(
|
|
treq(headers=Headers()), tresp(headers=Headers())
|
|
) == -1
|
|
|
|
|
|
def test_get_first_line():
|
|
rfile = BytesIO(b"foo\r\nbar")
|
|
assert _get_first_line(rfile) == b"foo"
|
|
|
|
rfile = BytesIO(b"\r\nfoo\r\nbar")
|
|
assert _get_first_line(rfile) == b"foo"
|
|
|
|
with raises(HttpReadDisconnect):
|
|
rfile = BytesIO(b"")
|
|
_get_first_line(rfile)
|
|
|
|
with raises(HttpReadDisconnect):
|
|
rfile = Mock()
|
|
rfile.readline.side_effect = TcpDisconnect
|
|
_get_first_line(rfile)
|
|
|
|
|
|
def test_read_request_line():
|
|
def t(b):
|
|
return _read_request_line(BytesIO(b))
|
|
|
|
assert (t(b"GET / HTTP/1.1") ==
|
|
("relative", b"GET", None, None, None, b"/", b"HTTP/1.1"))
|
|
assert (t(b"OPTIONS * HTTP/1.1") ==
|
|
("relative", b"OPTIONS", None, None, None, b"*", b"HTTP/1.1"))
|
|
assert (t(b"CONNECT foo:42 HTTP/1.1") ==
|
|
("authority", b"CONNECT", None, b"foo", 42, None, b"HTTP/1.1"))
|
|
assert (t(b"GET http://foo:42/bar HTTP/1.1") ==
|
|
("absolute", b"GET", b"http", b"foo", 42, b"/bar", b"HTTP/1.1"))
|
|
|
|
with raises(HttpSyntaxException):
|
|
t(b"GET / WTF/1.1")
|
|
with raises(HttpSyntaxException):
|
|
t(b"this is not http")
|
|
with raises(HttpReadDisconnect):
|
|
t(b"")
|
|
|
|
|
|
def test_parse_authority_form():
|
|
assert _parse_authority_form(b"foo:42") == (b"foo", 42)
|
|
with raises(HttpSyntaxException):
|
|
_parse_authority_form(b"foo")
|
|
with raises(HttpSyntaxException):
|
|
_parse_authority_form(b"foo:bar")
|
|
with raises(HttpSyntaxException):
|
|
_parse_authority_form(b"foo:99999999")
|
|
with raises(HttpSyntaxException):
|
|
_parse_authority_form(b"f\x00oo:80")
|
|
|
|
|
|
def test_read_response_line():
|
|
def t(b):
|
|
return _read_response_line(BytesIO(b))
|
|
|
|
assert t(b"HTTP/1.1 200 OK") == (b"HTTP/1.1", 200, b"OK")
|
|
assert t(b"HTTP/1.1 200") == (b"HTTP/1.1", 200, b"")
|
|
|
|
# https://github.com/mitmproxy/mitmproxy/issues/784
|
|
assert t(b"HTTP/1.1 200 Non-Autoris\xc3\xa9") == (b"HTTP/1.1", 200, b"Non-Autoris\xc3\xa9")
|
|
|
|
with raises(HttpSyntaxException):
|
|
assert t(b"HTTP/1.1")
|
|
|
|
with raises(HttpSyntaxException):
|
|
t(b"HTTP/1.1 OK OK")
|
|
with raises(HttpSyntaxException):
|
|
t(b"WTF/1.1 200 OK")
|
|
with raises(HttpReadDisconnect):
|
|
t(b"")
|
|
|
|
|
|
def test_check_http_version():
|
|
_check_http_version(b"HTTP/0.9")
|
|
_check_http_version(b"HTTP/1.0")
|
|
_check_http_version(b"HTTP/1.1")
|
|
_check_http_version(b"HTTP/2.0")
|
|
with raises(HttpSyntaxException):
|
|
_check_http_version(b"WTF/1.0")
|
|
with raises(HttpSyntaxException):
|
|
_check_http_version(b"HTTP/1.10")
|
|
with raises(HttpSyntaxException):
|
|
_check_http_version(b"HTTP/1.b")
|
|
|
|
|
|
class TestReadHeaders(object):
|
|
@staticmethod
|
|
def _read(data):
|
|
return _read_headers(BytesIO(data))
|
|
|
|
def test_read_simple(self):
|
|
data = (
|
|
b"Header: one\r\n"
|
|
b"Header2: two\r\n"
|
|
b"\r\n"
|
|
)
|
|
headers = self._read(data)
|
|
assert headers.fields == ((b"Header", b"one"), (b"Header2", b"two"))
|
|
|
|
def test_read_multi(self):
|
|
data = (
|
|
b"Header: one\r\n"
|
|
b"Header: two\r\n"
|
|
b"\r\n"
|
|
)
|
|
headers = self._read(data)
|
|
assert headers.fields == ((b"Header", b"one"), (b"Header", b"two"))
|
|
|
|
def test_read_continued(self):
|
|
data = (
|
|
b"Header: one\r\n"
|
|
b"\ttwo\r\n"
|
|
b"Header2: three\r\n"
|
|
b"\r\n"
|
|
)
|
|
headers = self._read(data)
|
|
assert headers.fields == ((b"Header", b"one\r\n two"), (b"Header2", b"three"))
|
|
|
|
def test_read_continued_err(self):
|
|
data = b"\tfoo: bar\r\n"
|
|
with raises(HttpSyntaxException):
|
|
self._read(data)
|
|
|
|
def test_read_err(self):
|
|
data = b"foo"
|
|
with raises(HttpSyntaxException):
|
|
self._read(data)
|
|
|
|
def test_read_empty_name(self):
|
|
data = b":foo"
|
|
with raises(HttpSyntaxException):
|
|
self._read(data)
|
|
|
|
def test_read_empty_value(self):
|
|
data = b"bar:"
|
|
headers = self._read(data)
|
|
assert headers.fields == ((b"bar", b""),)
|
|
|
|
|
|
def test_read_chunked():
|
|
req = treq(content=None)
|
|
req.headers["Transfer-Encoding"] = "chunked"
|
|
|
|
data = b"1\r\na\r\n0\r\n"
|
|
with raises(HttpSyntaxException):
|
|
b"".join(_read_chunked(BytesIO(data)))
|
|
|
|
data = b"1\r\na\r\n0\r\n\r\n"
|
|
assert b"".join(_read_chunked(BytesIO(data))) == b"a"
|
|
|
|
data = b"\r\n\r\n1\r\na\r\n1\r\nb\r\n0\r\n\r\n"
|
|
assert b"".join(_read_chunked(BytesIO(data))) == b"ab"
|
|
|
|
data = b"\r\n"
|
|
with raises("closed prematurely"):
|
|
b"".join(_read_chunked(BytesIO(data)))
|
|
|
|
data = b"1\r\nfoo"
|
|
with raises("malformed chunked body"):
|
|
b"".join(_read_chunked(BytesIO(data)))
|
|
|
|
data = b"foo\r\nfoo"
|
|
with raises(HttpSyntaxException):
|
|
b"".join(_read_chunked(BytesIO(data)))
|
|
|
|
data = b"5\r\naaaaa\r\n0\r\n\r\n"
|
|
with raises("too large"):
|
|
b"".join(_read_chunked(BytesIO(data), limit=2))
|