mitmproxy/test/pathod/test_pathod.py

264 lines
7.1 KiB
Python
Raw Normal View History

2016-10-17 03:38:31 +00:00
import io
from pathod import pathod
from mitmproxy.net import tcp
from mitmproxy import exceptions
from . import tutils
2014-10-24 01:01:34 +00:00
2016-10-17 04:29:45 +00:00
class TestPathod:
2015-06-18 16:12:11 +00:00
def test_logging(self):
2016-10-17 03:38:31 +00:00
s = io.StringIO()
p = pathod.Pathod(("127.0.0.1", 0), logfp=s)
assert len(p.get_log()) == 0
id = p.add_log(dict(s="foo"))
assert p.log_by_id(id)
assert len(p.get_log()) == 1
p.clear_log()
assert len(p.get_log()) == 0
for _ in range(p.LOGBUF + 1):
p.add_log(dict(s="foo"))
assert len(p.get_log()) <= p.LOGBUF
class TestTimeout(tutils.DaemonTests):
timeout = 0.01
def test_timeout(self):
# FIXME: Add float values to spec language, reduce test timeout to
# increase test performance
# This is a bodge - we have some platform difference that causes
# different exceptions to be raised here.
tutils.raises(Exception, self.pathoc, ["get:/:p1,1"])
assert self.d.last_log()["type"] == "timeout"
class TestNotAfterConnect(tutils.DaemonTests):
ssl = False
ssloptions = dict(
2015-06-18 16:12:11 +00:00
not_after_connect=True
)
2015-05-30 00:03:13 +00:00
def test_connect(self):
r, _ = self.pathoc(
[r"get:'http://foo.com/p/202':da"],
2014-10-24 01:01:34 +00:00
connect_to=("localhost", self.d.port)
)
assert r[0].status_code == 202
2014-03-02 02:13:56 +00:00
class TestCustomCert(tutils.DaemonTests):
ssl = True
ssloptions = dict(
certs=[(b"*", tutils.test_data.path("data/testkey.pem"))],
2014-03-02 02:13:56 +00:00
)
2015-05-30 00:03:13 +00:00
2014-03-02 02:13:56 +00:00
def test_connect(self):
r, _ = self.pathoc([r"get:/p/202"])
r = r[0]
2014-03-02 02:13:56 +00:00
assert r.status_code == 202
assert r.sslinfo
assert "test.com" in str(r.sslinfo.certchain[0].get_subject())
2014-03-02 02:13:56 +00:00
class TestSSLCN(tutils.DaemonTests):
ssl = True
ssloptions = dict(
2016-06-15 08:15:35 +00:00
cn=b"foo.com"
)
2015-05-30 00:03:13 +00:00
def test_connect(self):
r, _ = self.pathoc([r"get:/p/202"])
r = r[0]
assert r.status_code == 202
assert r.sslinfo
assert r.sslinfo.certchain[0].get_subject().CN == "foo.com"
class TestNohang(tutils.DaemonTests):
nohang = True
2015-05-30 00:03:13 +00:00
def test_nohang(self):
r = self.get("200:p0,0")
assert r.status_code == 800
l = self.d.last_log()
assert "Pauses have been disabled" in l["response"]["msg"]
class TestHexdump(tutils.DaemonTests):
hexdump = True
2015-05-30 00:03:13 +00:00
def test_hexdump(self):
2016-05-29 11:33:20 +00:00
assert self.get(r"200:b'\xf0'")
2015-06-04 08:36:50 +00:00
class TestNocraft(tutils.DaemonTests):
nocraft = True
def test_nocraft(self):
r = self.get(r"200:b'\xf0'")
assert r.status_code == 800
2016-06-15 08:15:35 +00:00
assert b"Crafting disabled" in r.content
2015-06-04 08:36:50 +00:00
class CommonTests(tutils.DaemonTests):
2015-06-18 16:12:11 +00:00
def test_binarydata(self):
2016-05-29 11:33:20 +00:00
assert self.get(r"200:b'\xf0'")
assert self.d.last_log()
# FIXME: Other binary data elements
def test_sizelimit(self):
r = self.get("200:b@1g")
assert r.status_code == 800
l = self.d.last_log()
assert "too large" in l["response"]["msg"]
def test_preline(self):
r, _ = self.pathoc([r"get:'/p/200':i0,'\r\n'"])
assert r[0].status_code == 200
def test_logs(self):
self.d.clear_log()
2016-05-29 11:33:20 +00:00
assert self.get("202:da")
assert self.d.expect_log(1)
self.d.clear_log()
assert len(self.d.log()) == 0
def test_disconnect(self):
tutils.raises("unexpected eof", self.get, "202:b@100k:d200")
def test_parserr(self):
rsp = self.get("400:msg,b:")
assert rsp.status_code == 800
def test_static(self):
rsp = self.get("200:b<file")
assert rsp.status_code == 200
2016-06-15 08:15:35 +00:00
assert rsp.content.strip() == b"testfile"
def test_anchor(self):
rsp = self.getpath("/anchor/foo")
assert rsp.status_code == 202
def test_invalid_first_line(self):
c = tcp.TCPClient(("localhost", self.d.port))
2016-06-06 23:03:45 +00:00
with c.connect():
if self.ssl:
c.convert_to_ssl()
2016-06-15 08:15:35 +00:00
c.wfile.write(b"foo\n\n\n")
2016-06-06 23:03:45 +00:00
c.wfile.flush()
l = self.d.last_log()
assert l["type"] == "error"
assert "foo" in l["msg"]
2014-11-11 00:34:02 +00:00
def test_invalid_content_length(self):
2014-10-24 01:01:34 +00:00
tutils.raises(
exceptions.HttpException,
2014-10-24 01:01:34 +00:00
self.pathoc,
["get:/:h'content-length'='foo'"]
2014-10-24 01:01:34 +00:00
)
l = self.d.last_log()
2012-07-22 00:30:10 +00:00
assert l["type"] == "error"
2015-09-16 16:44:34 +00:00
assert "Unparseable Content Length" in l["msg"]
2012-07-22 00:30:10 +00:00
2012-07-30 01:52:40 +00:00
def test_invalid_headers(self):
tutils.raises(exceptions.HttpException, self.pathoc, ["get:/:h'\t'='foo'"])
l = self.d.last_log()
2012-07-30 01:52:40 +00:00
assert l["type"] == "error"
assert "Invalid headers" in l["msg"]
def test_access_denied(self):
rsp = self.get("=nonexistent")
assert rsp.status_code == 800
2012-07-23 05:53:17 +00:00
def test_source_access_denied(self):
rsp = self.get("200:b</foo")
assert rsp.status_code == 800
2016-06-15 08:15:35 +00:00
assert b"File access denied" in rsp.content
2012-07-23 05:53:17 +00:00
def test_proxy(self):
r, _ = self.pathoc([r"get:'http://foo.com/p/202':da"])
assert r[0].status_code == 202
def test_websocket(self):
r, _ = self.pathoc(["ws:/p/"], ws_read_limit=0)
assert r[0].status_code == 101
r, _ = self.pathoc(["ws:/p/ws"], ws_read_limit=0)
assert r[0].status_code == 101
def test_websocket_frame(self):
2015-06-08 03:40:58 +00:00
r, _ = self.pathoc(
2015-06-08 03:57:29 +00:00
["ws:/p/", "wf:f'wf:b\"test\"':pa,1"],
2015-06-08 03:40:58 +00:00
ws_read_limit=1
)
2016-06-15 08:15:35 +00:00
assert r[1].payload == b"test"
def test_websocket_frame_reflect_error(self):
r, _ = self.pathoc(
["ws:/p/", "wf:-mask:knone:f'wf:b@10':i13,'a'"],
2015-06-18 16:12:11 +00:00
ws_read_limit=1,
timeout=1
)
2015-09-25 22:40:01 +00:00
# FIXME: Race Condition?
assert "Parse error" in self.d.text_log()
def test_websocket_frame_disconnect_error(self):
self.pathoc(["ws:/p/", "wf:b@10:d3"], ws_read_limit=0)
assert self.d.last_log()
class TestDaemon(CommonTests):
ssl = False
2015-05-30 00:03:13 +00:00
def test_connect(self):
r, _ = self.pathoc(
[r"get:'http://foo.com/p/202':da"],
connect_to=("localhost", self.d.port),
ssl=True
)
assert r[0].status_code == 202
def test_connect_err(self):
tutils.raises(
exceptions.HttpException,
self.pathoc,
[r"get:'http://foo.com/p/202':da"],
connect_to=("localhost", self.d.port)
)
class TestDaemonSSL(CommonTests):
ssl = True
def test_ssl_conn_failure(self):
c = tcp.TCPClient(("localhost", self.d.port))
c.rbufsize = 0
c.wbufsize = 0
2016-06-06 23:03:45 +00:00
with c.connect():
2016-06-15 08:15:35 +00:00
c.wfile.write(b"\0\0\0\0")
tutils.raises(exceptions.TlsException, c.convert_to_ssl)
2016-06-06 23:03:45 +00:00
l = self.d.last_log()
assert l["type"] == "error"
assert "SSL" in l["msg"]
def test_ssl_cipher(self):
r, _ = self.pathoc([r"get:/p/202"])
assert r[0].status_code == 202
assert self.d.last_log()["cipher"][1] > 0
2015-06-18 16:12:11 +00:00
class TestHTTP2(tutils.DaemonTests):
ssl = True
nohang = True
2016-02-15 17:43:06 +00:00
if tcp.HAS_ALPN:
def test_http2(self):
r, _ = self.pathoc(["GET:/"], ssl=True, use_http2=True)
2015-07-29 09:26:10 +00:00
assert r[0].status_code == 800