2012-02-19 22:04:07 +00:00
|
|
|
import cStringIO, textwrap
|
2012-02-19 21:34:32 +00:00
|
|
|
from cStringIO import StringIO
|
2010-02-16 04:09:07 +00:00
|
|
|
import libpry
|
2012-02-19 22:04:07 +00:00
|
|
|
from libmproxy import proxy, flow
|
2012-06-09 01:42:43 +00:00
|
|
|
import tutils
|
2010-02-16 04:09:07 +00:00
|
|
|
|
2012-06-10 04:49:59 +00:00
|
|
|
|
|
|
|
def test_has_chunked_encoding():
|
|
|
|
h = flow.ODictCaseless()
|
|
|
|
assert not proxy.has_chunked_encoding(h)
|
|
|
|
h["transfer-encoding"] = ["chunked"]
|
|
|
|
assert proxy.has_chunked_encoding(h)
|
2010-02-16 04:09:07 +00:00
|
|
|
|
2011-03-12 03:00:01 +00:00
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
def test_read_chunked():
|
|
|
|
s = cStringIO.StringIO("1\r\na\r\n0\r\n")
|
|
|
|
tutils.raises(IOError, proxy.read_chunked, s, None)
|
2011-03-12 03:00:01 +00:00
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n")
|
|
|
|
assert proxy.read_chunked(s, None) == "a"
|
2011-03-12 03:00:01 +00:00
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
s = cStringIO.StringIO("\r\n")
|
|
|
|
tutils.raises(IOError, proxy.read_chunked, s, None)
|
2011-09-09 02:49:34 +00:00
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
s = cStringIO.StringIO("1\r\nfoo")
|
|
|
|
tutils.raises(IOError, proxy.read_chunked, s, None)
|
2011-09-09 02:49:34 +00:00
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
s = cStringIO.StringIO("foo\r\nfoo")
|
|
|
|
tutils.raises(proxy.ProxyError, proxy.read_chunked, s, None)
|
2011-09-09 02:49:34 +00:00
|
|
|
|
2012-02-24 23:19:54 +00:00
|
|
|
|
2012-06-10 04:49:59 +00:00
|
|
|
def test_request_connection_close():
|
2012-06-09 22:10:46 +00:00
|
|
|
h = flow.ODictCaseless()
|
2012-06-10 04:49:59 +00:00
|
|
|
assert proxy.request_connection_close((1, 0), h)
|
|
|
|
assert not proxy.request_connection_close((1, 1), h)
|
2012-06-09 22:10:46 +00:00
|
|
|
|
|
|
|
h["connection"] = ["keep-alive"]
|
2012-06-10 04:49:59 +00:00
|
|
|
assert not proxy.request_connection_close((1, 1), h)
|
2012-06-09 22:10:46 +00:00
|
|
|
|
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
def test_read_http_body():
|
|
|
|
h = flow.ODict()
|
|
|
|
s = cStringIO.StringIO("testing")
|
2012-06-10 04:49:59 +00:00
|
|
|
assert proxy.read_http_body(s, h, False, None) == ""
|
2011-09-09 02:49:34 +00:00
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
h["content-length"] = ["foo"]
|
|
|
|
s = cStringIO.StringIO("testing")
|
2012-06-10 04:49:59 +00:00
|
|
|
tutils.raises(proxy.ProxyError, proxy.read_http_body, s, h, False, None)
|
2011-09-09 02:49:34 +00:00
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
h["content-length"] = [5]
|
|
|
|
s = cStringIO.StringIO("testing")
|
2012-06-10 04:49:59 +00:00
|
|
|
assert len(proxy.read_http_body(s, h, False, None)) == 5
|
2012-06-09 01:42:43 +00:00
|
|
|
s = cStringIO.StringIO("testing")
|
2012-06-10 04:49:59 +00:00
|
|
|
tutils.raises(proxy.ProxyError, proxy.read_http_body, s, h, False, 4)
|
2011-09-09 02:49:34 +00:00
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
h = flow.ODict()
|
|
|
|
s = cStringIO.StringIO("testing")
|
2012-06-10 04:49:59 +00:00
|
|
|
assert len(proxy.read_http_body(s, h, True, 4)) == 4
|
2012-06-09 01:42:43 +00:00
|
|
|
s = cStringIO.StringIO("testing")
|
2012-06-10 04:49:59 +00:00
|
|
|
assert len(proxy.read_http_body(s, h, True, 100)) == 7
|
2011-09-09 02:49:34 +00:00
|
|
|
|
2011-03-12 03:00:01 +00:00
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
class TestFileLike:
|
2010-02-16 04:09:07 +00:00
|
|
|
def test_wrap(self):
|
|
|
|
s = cStringIO.StringIO("foobar\nfoobar")
|
|
|
|
s = proxy.FileLike(s)
|
|
|
|
s.flush()
|
|
|
|
assert s.readline() == "foobar\n"
|
|
|
|
assert s.readline() == "foobar"
|
2011-02-02 23:16:03 +00:00
|
|
|
# Test __getattr__
|
|
|
|
assert s.isatty
|
2010-02-16 04:09:07 +00:00
|
|
|
|
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
class TestProxyError:
|
2010-02-16 04:09:07 +00:00
|
|
|
def test_simple(self):
|
|
|
|
p = proxy.ProxyError(111, "msg")
|
|
|
|
assert repr(p)
|
|
|
|
|
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
class TestReadHeaders:
|
2012-02-19 21:34:32 +00:00
|
|
|
def test_read_simple(self):
|
|
|
|
data = """
|
|
|
|
Header: one
|
|
|
|
Header2: two
|
|
|
|
\r\n
|
|
|
|
"""
|
|
|
|
data = textwrap.dedent(data)
|
|
|
|
data = data.strip()
|
|
|
|
s = StringIO(data)
|
|
|
|
headers = proxy.read_headers(s)
|
|
|
|
assert headers["header"] == ["one"]
|
|
|
|
assert headers["header2"] == ["two"]
|
|
|
|
|
|
|
|
def test_read_multi(self):
|
|
|
|
data = """
|
|
|
|
Header: one
|
|
|
|
Header: two
|
|
|
|
\r\n
|
|
|
|
"""
|
|
|
|
data = textwrap.dedent(data)
|
|
|
|
data = data.strip()
|
|
|
|
s = StringIO(data)
|
|
|
|
headers = proxy.read_headers(s)
|
|
|
|
assert headers["header"] == ["one", "two"]
|
|
|
|
|
|
|
|
def test_read_continued(self):
|
|
|
|
data = """
|
|
|
|
Header: one
|
|
|
|
\ttwo
|
|
|
|
Header2: three
|
|
|
|
\r\n
|
|
|
|
"""
|
|
|
|
data = textwrap.dedent(data)
|
|
|
|
data = data.strip()
|
|
|
|
s = StringIO(data)
|
|
|
|
headers = proxy.read_headers(s)
|
|
|
|
assert headers["header"] == ['one\r\n two']
|
|
|
|
|
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
def test_parse_http_protocol():
|
|
|
|
assert proxy.parse_http_protocol("HTTP/1.1") == (1, 1)
|
|
|
|
assert proxy.parse_http_protocol("HTTP/0.0") == (0, 0)
|
|
|
|
assert not proxy.parse_http_protocol("foo/0.0")
|
2012-06-03 13:04:57 +00:00
|
|
|
|
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
def test_parse_init_connect():
|
|
|
|
assert proxy.parse_init_connect("CONNECT host.com:443 HTTP/1.0")
|
|
|
|
assert not proxy.parse_init_connect("bogus")
|
|
|
|
assert not proxy.parse_init_connect("GET host.com:443 HTTP/1.0")
|
|
|
|
assert not proxy.parse_init_connect("CONNECT host.com443 HTTP/1.0")
|
|
|
|
assert not proxy.parse_init_connect("CONNECT host.com:443 foo/1.0")
|
2012-06-03 13:04:57 +00:00
|
|
|
|
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
def test_prase_init_proxy():
|
|
|
|
u = "GET http://foo.com:8888/test HTTP/1.1"
|
2012-06-09 22:31:04 +00:00
|
|
|
m, s, h, po, pa, httpversion = proxy.parse_init_proxy(u)
|
2012-06-09 01:42:43 +00:00
|
|
|
assert m == "GET"
|
|
|
|
assert s == "http"
|
|
|
|
assert h == "foo.com"
|
|
|
|
assert po == 8888
|
|
|
|
assert pa == "/test"
|
2012-06-09 22:31:04 +00:00
|
|
|
assert httpversion == (1, 1)
|
2012-06-03 13:04:57 +00:00
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
assert not proxy.parse_init_proxy("invalid")
|
|
|
|
assert not proxy.parse_init_proxy("GET invalid HTTP/1.1")
|
|
|
|
assert not proxy.parse_init_proxy("GET http://foo.com:8888/test foo/1.1")
|
2012-06-03 13:04:57 +00:00
|
|
|
|
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
def test_parse_init_http():
|
|
|
|
u = "GET /test HTTP/1.1"
|
2012-06-09 22:31:04 +00:00
|
|
|
m, u, httpversion= proxy.parse_init_http(u)
|
2012-06-09 01:42:43 +00:00
|
|
|
assert m == "GET"
|
|
|
|
assert u == "/test"
|
2012-06-09 22:31:04 +00:00
|
|
|
assert httpversion == (1, 1)
|
2012-06-03 13:04:57 +00:00
|
|
|
|
2012-06-09 01:42:43 +00:00
|
|
|
assert not proxy.parse_init_http("invalid")
|
|
|
|
assert not proxy.parse_init_http("GET invalid HTTP/1.1")
|
|
|
|
assert not proxy.parse_init_http("GET /test foo/1.1")
|