mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-27 02:24:18 +00:00
Whitespace, pep8, mixed indentation
This commit is contained in:
parent
dd7ea896f2
commit
7d83e388aa
@ -331,12 +331,15 @@ def read_response(rfile, request_method, body_size_limit, include_body=True):
|
||||
Return an (httpversion, code, msg, headers, content) tuple.
|
||||
|
||||
By default, both response header and body are read.
|
||||
If include_body=False is specified, content may be one of the following:
|
||||
If include_body=False is specified, content may be one of the
|
||||
following:
|
||||
- None, if the response is technically allowed to have a response body
|
||||
- "", if the response must not have a response body (e.g. it's a response to a HEAD request)
|
||||
- "", if the response must not have a response body (e.g. it's a
|
||||
response to a HEAD request)
|
||||
"""
|
||||
line = rfile.readline()
|
||||
if line == "\r\n" or line == "\n": # Possible leftover from previous message
|
||||
# Possible leftover from previous message
|
||||
if line == "\r\n" or line == "\n":
|
||||
line = rfile.readline()
|
||||
if not line:
|
||||
raise HttpErrorConnClosed(502, "Server disconnect.")
|
||||
@ -373,7 +376,15 @@ def read_http_body(*args, **kwargs):
|
||||
)
|
||||
|
||||
|
||||
def read_http_body_chunked(rfile, headers, limit, request_method, response_code, is_request, max_chunk_size=None):
|
||||
def read_http_body_chunked(
|
||||
rfile,
|
||||
headers,
|
||||
limit,
|
||||
request_method,
|
||||
response_code,
|
||||
is_request,
|
||||
max_chunk_size=None
|
||||
):
|
||||
"""
|
||||
Read an HTTP message body:
|
||||
|
||||
|
@ -8,10 +8,12 @@ def isascii(s):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
# best way to do it in python 2.x
|
||||
def bytes_to_int(i):
|
||||
return int(i.encode('hex'), 16)
|
||||
|
||||
|
||||
def cleanBin(s, fixspacing=False):
|
||||
"""
|
||||
Cleans binary data to make it safe to display. If fixspacing is True,
|
||||
|
@ -1,4 +1,6 @@
|
||||
import cStringIO, textwrap, binascii
|
||||
import cStringIO
|
||||
import textwrap
|
||||
import binascii
|
||||
from netlib import http, odict, tcp, test
|
||||
import tutils
|
||||
|
||||
@ -21,7 +23,11 @@ def test_read_chunked():
|
||||
h["transfer-encoding"] = ["chunked"]
|
||||
s = cStringIO.StringIO("1\r\na\r\n0\r\n")
|
||||
|
||||
tutils.raises("malformed chunked body", http.read_http_body, s, h, None, "GET", None, True)
|
||||
tutils.raises(
|
||||
"malformed chunked body",
|
||||
http.read_http_body,
|
||||
s, h, None, "GET", None, True
|
||||
)
|
||||
|
||||
s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n")
|
||||
assert http.read_http_body(s, h, None, "GET", None, True) == "a"
|
||||
@ -30,13 +36,25 @@ def test_read_chunked():
|
||||
assert http.read_http_body(s, h, None, "GET", None, True) == "a"
|
||||
|
||||
s = cStringIO.StringIO("\r\n")
|
||||
tutils.raises("closed prematurely", http.read_http_body, s, h, None, "GET", None, True)
|
||||
tutils.raises(
|
||||
"closed prematurely",
|
||||
http.read_http_body,
|
||||
s, h, None, "GET", None, True
|
||||
)
|
||||
|
||||
s = cStringIO.StringIO("1\r\nfoo")
|
||||
tutils.raises("malformed chunked body", http.read_http_body, s, h, None, "GET", None, True)
|
||||
tutils.raises(
|
||||
"malformed chunked body",
|
||||
http.read_http_body,
|
||||
s, h, None, "GET", None, True
|
||||
)
|
||||
|
||||
s = cStringIO.StringIO("foo\r\nfoo")
|
||||
tutils.raises(http.HttpError, http.read_http_body, s, h, None, "GET", None, True)
|
||||
tutils.raises(
|
||||
http.HttpError,
|
||||
http.read_http_body,
|
||||
s, h, None, "GET", None, True
|
||||
)
|
||||
|
||||
s = cStringIO.StringIO("5\r\naaaaa\r\n0\r\n\r\n")
|
||||
tutils.raises("too large", http.read_http_body, s, h, 2, "GET", None, True)
|
||||
@ -87,17 +105,29 @@ def test_read_http_body():
|
||||
# test content length: invalid header
|
||||
h["content-length"] = ["foo"]
|
||||
s = cStringIO.StringIO("testing")
|
||||
tutils.raises(http.HttpError, http.read_http_body, s, h, None, "GET", 200, False)
|
||||
tutils.raises(
|
||||
http.HttpError,
|
||||
http.read_http_body,
|
||||
s, h, None, "GET", 200, False
|
||||
)
|
||||
|
||||
# test content length: invalid header #2
|
||||
h["content-length"] = [-1]
|
||||
s = cStringIO.StringIO("testing")
|
||||
tutils.raises(http.HttpError, http.read_http_body, s, h, None, "GET", 200, False)
|
||||
tutils.raises(
|
||||
http.HttpError,
|
||||
http.read_http_body,
|
||||
s, h, None, "GET", 200, False
|
||||
)
|
||||
|
||||
# test content length: content length > actual content
|
||||
h["content-length"] = [5]
|
||||
s = cStringIO.StringIO("testing")
|
||||
tutils.raises(http.HttpError, http.read_http_body, s, h, 4, "GET", 200, False)
|
||||
tutils.raises(
|
||||
http.HttpError,
|
||||
http.read_http_body,
|
||||
s, h, 4, "GET", 200, False
|
||||
)
|
||||
|
||||
# test content length: content length < actual content
|
||||
s = cStringIO.StringIO("testing")
|
||||
@ -110,7 +140,11 @@ def test_read_http_body():
|
||||
|
||||
# test no content length: limit < actual content
|
||||
s = cStringIO.StringIO("testing")
|
||||
tutils.raises(http.HttpError, http.read_http_body, s, h, 4, "GET", 200, False)
|
||||
tutils.raises(
|
||||
http.HttpError,
|
||||
http.read_http_body,
|
||||
s, h, 4, "GET", 200, False
|
||||
)
|
||||
|
||||
# test chunked
|
||||
h = odict.ODictCaseless()
|
||||
@ -271,11 +305,15 @@ def test_read_response():
|
||||
data = """
|
||||
HTTP/1.1 200 OK
|
||||
"""
|
||||
assert tst(data, "GET", None) == ((1, 1), 200, 'OK', odict.ODictCaseless(), '')
|
||||
assert tst(data, "GET", None) == (
|
||||
(1, 1), 200, 'OK', odict.ODictCaseless(), ''
|
||||
)
|
||||
data = """
|
||||
HTTP/1.1 200
|
||||
"""
|
||||
assert tst(data, "GET", None) == ((1, 1), 200, '', odict.ODictCaseless(), '')
|
||||
assert tst(data, "GET", None) == (
|
||||
(1, 1), 200, '', odict.ODictCaseless(), ''
|
||||
)
|
||||
data = """
|
||||
HTTP/x 200 OK
|
||||
"""
|
||||
@ -290,7 +328,9 @@ def test_read_response():
|
||||
|
||||
HTTP/1.1 200 OK
|
||||
"""
|
||||
assert tst(data, "GET", None) == ((1, 1), 100, 'CONTINUE', odict.ODictCaseless(), '')
|
||||
assert tst(data, "GET", None) == (
|
||||
(1, 1), 100, 'CONTINUE', odict.ODictCaseless(), ''
|
||||
)
|
||||
|
||||
data = """
|
||||
HTTP/1.1 200 OK
|
||||
@ -315,7 +355,7 @@ def test_read_response():
|
||||
|
||||
foo
|
||||
"""
|
||||
assert tst(data, "GET", None, include_body=False)[4] == None
|
||||
assert tst(data, "GET", None, include_body=False)[4] is None
|
||||
|
||||
|
||||
def test_parse_url():
|
||||
@ -363,7 +403,9 @@ def test_parse_url():
|
||||
|
||||
def test_parse_http_basic_auth():
|
||||
vals = ("basic", "foo", "bar")
|
||||
assert http.parse_http_basic_auth(http.assemble_http_basic_auth(*vals)) == vals
|
||||
assert http.parse_http_basic_auth(
|
||||
http.assemble_http_basic_auth(*vals)
|
||||
) == vals
|
||||
assert not http.parse_http_basic_auth("")
|
||||
assert not http.parse_http_basic_auth("foo bar")
|
||||
v = "basic " + binascii.b2a_base64("foo")
|
||||
|
Loading…
Reference in New Issue
Block a user