mitmproxy/test/test_pathod.py

242 lines
6.5 KiB
Python
Raw Normal View History

from libpathod import pathod, version
from netlib import tcp, http, certutils
2012-06-24 04:38:32 +00:00
import tutils
2014-10-24 01:01:34 +00:00
2012-06-20 22:56:30 +00:00
class TestPathod:
def test_instantiation(self):
2012-06-24 04:20:50 +00:00
p = pathod.Pathod(
2014-10-24 01:01:34 +00:00
("127.0.0.1", 0),
anchors = [(".*", "200:da")]
)
2012-06-24 04:20:50 +00:00
assert p.anchors
2014-10-24 01:01:34 +00:00
tutils.raises(
"invalid regex",
pathod.Pathod,
("127.0.0.1", 0),
anchors=[("*", "200:da")]
)
tutils.raises(
"invalid page spec",
pathod.Pathod,
("127.0.0.1", 0),
anchors=[("foo", "bar")]
)
def test_logging(self):
p = pathod.Pathod(("127.0.0.1", 0))
assert len(p.get_log()) == 0
id = p.add_log(dict(s="foo"))
assert p.log_by_id(id)
assert len(p.get_log()) == 1
p.clear_log()
assert len(p.get_log()) == 0
for i in range(p.LOGBUF + 1):
p.add_log(dict(s="foo"))
assert len(p.get_log()) <= p.LOGBUF
class TestNoWeb(tutils.DaemonTests):
noweb = True
def test_noweb(self):
assert self.get("200:da").status_code == 200
assert self.getpath("/").status_code == 800
class TestTimeout(tutils.DaemonTests):
timeout = 0.01
def test_noweb(self):
# FIXME: Add float values to spec language, reduce test timeout to
# increase test performance
2013-02-24 02:36:35 +00:00
tutils.raises("server disconnect", self.pathoc, "get:/:p1,1")
assert self.d.last_log()["type"] == "timeout"
class TestNoApi(tutils.DaemonTests):
noapi = True
def test_noapi(self):
assert self.getpath("/log").status_code == 404
r = self.getpath("/")
assert r.status_code == 200
assert not "Log" in r.content
class TestNotAfterConnect(tutils.DaemonTests):
ssl = False
ssloptions = dict(
not_after_connect = True
)
def test_connect(self):
2014-10-24 01:01:34 +00:00
r = self.pathoc(
r"get:'http://foo.com/p/202':da",
connect_to=("localhost", self.d.port)
)
assert r.status_code == 202
2014-03-02 02:13:56 +00:00
class TestCustomCert(tutils.DaemonTests):
ssl = True
ssloptions = dict(
certs = [("*", tutils.test_data.path("data/testkey.pem"))],
2014-03-02 02:13:56 +00:00
)
def test_connect(self):
r = self.pathoc(r"get:/p/202")
assert r.status_code == 202
assert r.sslinfo
assert "Widgits" in str(r.sslinfo.certchain[0].get_subject())
2014-03-02 02:13:56 +00:00
class TestSSLCN(tutils.DaemonTests):
ssl = True
ssloptions = dict(
cn = "foo.com"
)
def test_connect(self):
r = self.pathoc(r"get:/p/202")
assert r.status_code == 202
assert r.sslinfo
assert r.sslinfo.certchain[0].get_subject().CN == "foo.com"
class TestNohang(tutils.DaemonTests):
nohang = True
def test_nohang(self):
r = self.get("200:p0,0")
assert r.status_code == 800
l = self.d.last_log()
assert "Pauses have been disabled" in l["response"]["msg"]
class TestHexdump(tutils.DaemonTests):
hexdump = True
def test_hexdump(self):
r = self.get(r"200:b'\xf0'")
class CommonTests(tutils.DaemonTests):
def test_binarydata(self):
r = self.get(r"200:b'\xf0'")
l = self.d.last_log()
# FIXME: Other binary data elements
def test_sizelimit(self):
r = self.get("200:b@1g")
assert r.status_code == 800
l = self.d.last_log()
assert "too large" in l["response"]["msg"]
def test_preline(self):
r = self.pathoc(r"get:'/p/200':i0,'\r\n'")
assert r.status_code == 200
def test_info(self):
assert tuple(self.d.info()["version"]) == version.IVERSION
def test_logs(self):
assert self.d.clear_log()
assert not self.d.last_log()
rsp = self.get("202:da")
assert len(self.d.log()) == 1
assert self.d.clear_log()
assert len(self.d.log()) == 0
def test_disconnect(self):
rsp = self.get("202:b@100k:d200")
assert len(rsp.content) < 200
def test_parserr(self):
rsp = self.get("400:msg,b:")
assert rsp.status_code == 800
def test_static(self):
rsp = self.get("200:b<file")
assert rsp.status_code == 200
assert rsp.content.strip() == "testfile"
def test_anchor(self):
rsp = self.getpath("anchor/foo")
assert rsp.status_code == 202
def test_invalid_first_line(self):
c = tcp.TCPClient(("localhost", self.d.port))
c.connect()
if self.ssl:
c.convert_to_ssl()
c.wfile.write("foo\n\n\n")
c.wfile.flush()
l = self.d.last_log()
assert l["type"] == "error"
assert "foo" in l["msg"]
2012-07-22 00:30:10 +00:00
def test_invalid_body(self):
2014-10-24 01:01:34 +00:00
tutils.raises(
http.HttpError,
self.pathoc,
"get:/:h'content-length'='foo'"
)
l = self.d.last_log()
2012-07-22 00:30:10 +00:00
assert l["type"] == "error"
assert "Invalid" in l["msg"]
2012-07-30 01:52:40 +00:00
def test_invalid_headers(self):
tutils.raises(http.HttpError, self.pathoc, "get:/:h'\t'='foo'")
l = self.d.last_log()
2012-07-30 01:52:40 +00:00
assert l["type"] == "error"
assert "Invalid headers" in l["msg"]
def test_access_denied(self):
rsp = self.get("=nonexistent")
assert rsp.status_code == 800
2012-07-23 05:53:17 +00:00
def test_source_access_denied(self):
rsp = self.get("200:b</foo")
assert rsp.status_code == 800
assert "File access denied" in rsp.content
2012-07-23 05:53:17 +00:00
def test_proxy(self):
r = self.pathoc(r"get:'http://foo.com/p/202':da")
assert r.status_code == 202
class TestDaemon(CommonTests):
ssl = False
def test_connect(self):
r = self.pathoc(
r"get:'http://foo.com/p/202':da",
connect_to=("localhost", self.d.port),
ssl=True
)
assert r.status_code == 202
def test_connect_err(self):
tutils.raises(
http.HttpError,
self.pathoc,
r"get:'http://foo.com/p/202':da",
connect_to=("localhost", self.d.port)
)
class TestDaemonSSL(CommonTests):
ssl = True
2014-10-24 01:01:34 +00:00
def _test_ssl_conn_failure(self):
c = tcp.TCPClient(("localhost", self.d.port))
c.rbufsize = 0
c.wbufsize = 0
c.connect()
try:
while 1:
c.wfile.write("\r\n\r\n\r\n")
except:
pass
l = self.d.last_log()
assert l["type"] == "error"
assert "SSL" in l["msg"]
def test_ssl_cipher(self):
r = self.pathoc(r"get:/p/202")
assert r.status_code == 202
assert self.d.last_log()["cipher"][1] > 0