2014-03-02 00:45:35 +00:00
|
|
|
import pprint
|
2012-07-23 04:18:47 +00:00
|
|
|
from libpathod import pathod, version
|
2012-07-22 00:30:10 +00:00
|
|
|
from netlib import tcp, http
|
2012-07-23 07:55:33 +00:00
|
|
|
import requests
|
2012-06-24 04:38:32 +00:00
|
|
|
import tutils
|
2012-04-28 22:56:33 +00:00
|
|
|
|
2012-06-20 22:56:30 +00:00
|
|
|
class TestPathod:
|
|
|
|
def test_instantiation(self):
|
2012-06-24 04:20:50 +00:00
|
|
|
p = pathod.Pathod(
|
|
|
|
("127.0.0.1", 0),
|
2012-11-17 20:04:49 +00:00
|
|
|
anchors = [(".*", "200:da")]
|
2012-06-24 04:20:50 +00:00
|
|
|
)
|
|
|
|
assert p.anchors
|
2012-11-17 20:04:49 +00:00
|
|
|
tutils.raises("invalid regex", pathod.Pathod, ("127.0.0.1", 0), anchors=[("*", "200:da")])
|
2012-06-24 04:38:32 +00:00
|
|
|
tutils.raises("invalid page spec", pathod.Pathod, ("127.0.0.1", 0), anchors=[("foo", "bar")])
|
2012-06-21 02:29:49 +00:00
|
|
|
|
|
|
|
def test_logging(self):
|
|
|
|
p = pathod.Pathod(("127.0.0.1", 0))
|
|
|
|
assert len(p.get_log()) == 0
|
|
|
|
id = p.add_log(dict(s="foo"))
|
|
|
|
assert p.log_by_id(id)
|
|
|
|
assert len(p.get_log()) == 1
|
|
|
|
p.clear_log()
|
|
|
|
assert len(p.get_log()) == 0
|
|
|
|
|
|
|
|
for i in range(p.LOGBUF + 1):
|
|
|
|
p.add_log(dict(s="foo"))
|
|
|
|
assert len(p.get_log()) <= p.LOGBUF
|
2012-06-26 05:28:07 +00:00
|
|
|
|
|
|
|
|
2012-07-23 07:55:33 +00:00
|
|
|
class TestNoWeb(tutils.DaemonTests):
|
|
|
|
noweb = True
|
|
|
|
def test_noweb(self):
|
2012-11-17 20:04:49 +00:00
|
|
|
assert self.get("200:da").status_code == 200
|
2012-07-23 07:55:33 +00:00
|
|
|
assert self.getpath("/").status_code == 800
|
|
|
|
|
|
|
|
|
2012-09-30 23:01:02 +00:00
|
|
|
class TestTimeout(tutils.DaemonTests):
|
2012-11-17 20:04:49 +00:00
|
|
|
timeout = 0.00001
|
2012-09-30 23:01:02 +00:00
|
|
|
def test_noweb(self):
|
|
|
|
# FIXME: Add float values to spec language, reduce test timeout to
|
|
|
|
# increase test performance
|
2013-02-24 02:36:35 +00:00
|
|
|
tutils.raises("server disconnect", self.pathoc, "get:/:p1,1")
|
2012-09-30 23:01:02 +00:00
|
|
|
assert self.d.last_log()["type"] == "timeout"
|
|
|
|
|
|
|
|
|
2012-07-24 22:34:57 +00:00
|
|
|
class TestNoApi(tutils.DaemonTests):
|
|
|
|
noapi = True
|
|
|
|
def test_noapi(self):
|
|
|
|
assert self.getpath("/log").status_code == 404
|
|
|
|
r = self.getpath("/")
|
|
|
|
assert r.status_code == 200
|
|
|
|
assert not "Log" in r.content
|
|
|
|
|
|
|
|
|
2013-01-05 07:36:06 +00:00
|
|
|
class TestNotAfterConnect(tutils.DaemonTests):
|
|
|
|
ssl = False
|
2014-03-02 00:45:35 +00:00
|
|
|
ssloptions = dict(
|
|
|
|
not_after_connect = True
|
|
|
|
)
|
2013-01-05 07:36:06 +00:00
|
|
|
def test_connect(self):
|
2013-02-26 20:07:16 +00:00
|
|
|
r = self.pathoc(r"get:'http://foo.com/p/202':da", connect_to=("localhost", self.d.port))
|
|
|
|
assert r.status_code == 202
|
2013-01-05 07:36:06 +00:00
|
|
|
|
|
|
|
|
2014-03-02 02:13:56 +00:00
|
|
|
class TestCustomCert(tutils.DaemonTests):
|
|
|
|
ssl = True
|
|
|
|
ssloptions = dict(
|
|
|
|
certfile = tutils.test_data.path("data/testkey.pem"),
|
|
|
|
cacert = tutils.test_data.path("data/testkey.pem"),
|
|
|
|
)
|
|
|
|
def test_connect(self):
|
|
|
|
r = self.pathoc(r"get:/p/202")
|
|
|
|
assert r.status_code == 202
|
|
|
|
assert r.sslinfo
|
|
|
|
|
|
|
|
|
|
|
|
|
2014-03-02 00:45:35 +00:00
|
|
|
class TestSSLCN(tutils.DaemonTests):
|
|
|
|
ssl = True
|
|
|
|
ssloptions = dict(
|
|
|
|
cn = "foo.com"
|
|
|
|
)
|
|
|
|
def test_connect(self):
|
|
|
|
r = self.pathoc(r"get:/p/202")
|
|
|
|
assert r.status_code == 202
|
|
|
|
assert r.sslinfo
|
|
|
|
assert r.sslinfo.certchain[0].get_subject().CN == "foo.com"
|
|
|
|
|
|
|
|
|
2012-07-26 08:01:51 +00:00
|
|
|
class TestNohang(tutils.DaemonTests):
|
|
|
|
nohang = True
|
|
|
|
def test_nohang(self):
|
|
|
|
r = self.get("200:p0,0")
|
|
|
|
assert r.status_code == 800
|
2012-07-30 05:29:36 +00:00
|
|
|
l = self.d.last_log()
|
2012-10-27 04:40:22 +00:00
|
|
|
assert "Pauses have been disabled" in l["msg"]
|
2012-07-26 08:01:51 +00:00
|
|
|
|
|
|
|
|
2012-10-24 21:59:18 +00:00
|
|
|
class TestHexdump(tutils.DaemonTests):
|
|
|
|
hexdump = True
|
|
|
|
def test_hexdump(self):
|
|
|
|
r = self.get(r"200:b'\xf0'")
|
|
|
|
|
|
|
|
|
2012-07-23 04:18:47 +00:00
|
|
|
class CommonTests(tutils.DaemonTests):
|
2012-10-24 21:59:18 +00:00
|
|
|
def test_binarydata(self):
|
|
|
|
r = self.get(r"200:b'\xf0'")
|
|
|
|
l = self.d.last_log()
|
|
|
|
# FIXME: Other binary data elements
|
|
|
|
|
2012-07-23 03:03:56 +00:00
|
|
|
def test_sizelimit(self):
|
|
|
|
r = self.get("200:b@1g")
|
|
|
|
assert r.status_code == 800
|
2012-07-30 05:29:36 +00:00
|
|
|
l = self.d.last_log()
|
2012-10-27 04:40:22 +00:00
|
|
|
assert "too large" in l["msg"]
|
2012-07-23 03:03:56 +00:00
|
|
|
|
2012-07-21 02:14:31 +00:00
|
|
|
def test_preline(self):
|
2013-02-26 20:07:16 +00:00
|
|
|
r = self.pathoc(r"get:'/p/200':i0,'\r\n'")
|
|
|
|
assert r.status_code == 200
|
2012-07-21 02:14:31 +00:00
|
|
|
|
2012-06-26 05:28:07 +00:00
|
|
|
def test_info(self):
|
|
|
|
assert tuple(self.d.info()["version"]) == version.IVERSION
|
|
|
|
|
|
|
|
def test_logs(self):
|
2012-07-30 05:29:36 +00:00
|
|
|
assert self.d.clear_log()
|
2013-02-23 08:46:01 +00:00
|
|
|
assert not self.d.last_log()
|
2012-11-17 20:04:49 +00:00
|
|
|
rsp = self.get("202:da")
|
2012-07-30 05:29:36 +00:00
|
|
|
assert len(self.d.log()) == 1
|
2012-06-26 05:28:07 +00:00
|
|
|
assert self.d.clear_log()
|
|
|
|
assert len(self.d.log()) == 0
|
|
|
|
|
|
|
|
def test_disconnect(self):
|
|
|
|
rsp = self.get("202:b@100k:d200")
|
|
|
|
assert len(rsp.content) < 200
|
|
|
|
|
|
|
|
def test_parserr(self):
|
|
|
|
rsp = self.get("400:msg,b:")
|
|
|
|
assert rsp.status_code == 800
|
|
|
|
|
|
|
|
def test_static(self):
|
|
|
|
rsp = self.get("200:b<file")
|
|
|
|
assert rsp.status_code == 200
|
|
|
|
assert rsp.content.strip() == "testfile"
|
|
|
|
|
|
|
|
def test_anchor(self):
|
|
|
|
rsp = self.getpath("anchor/foo")
|
|
|
|
assert rsp.status_code == 202
|
2012-07-20 01:21:33 +00:00
|
|
|
|
|
|
|
def test_invalid_first_line(self):
|
2014-01-28 18:28:20 +00:00
|
|
|
c = tcp.TCPClient(("localhost", self.d.port))
|
2012-07-20 01:21:33 +00:00
|
|
|
c.connect()
|
2012-07-23 07:55:33 +00:00
|
|
|
if self.ssl:
|
2012-07-20 01:21:33 +00:00
|
|
|
c.convert_to_ssl()
|
|
|
|
c.wfile.write("foo\n\n\n")
|
|
|
|
c.wfile.flush()
|
2012-07-30 05:29:36 +00:00
|
|
|
l = self.d.last_log()
|
2012-07-20 01:21:33 +00:00
|
|
|
assert l["type"] == "error"
|
|
|
|
assert "foo" in l["msg"]
|
|
|
|
|
2012-07-22 00:30:10 +00:00
|
|
|
def test_invalid_body(self):
|
|
|
|
tutils.raises(http.HttpError, self.pathoc, "get:/:h'content-length'='foo'")
|
2012-07-30 05:29:36 +00:00
|
|
|
l = self.d.last_log()
|
2012-07-22 00:30:10 +00:00
|
|
|
assert l["type"] == "error"
|
|
|
|
assert "Invalid" in l["msg"]
|
|
|
|
|
2012-07-30 01:52:40 +00:00
|
|
|
def test_invalid_headers(self):
|
|
|
|
tutils.raises(http.HttpError, self.pathoc, "get:/:h'\t'='foo'")
|
2012-07-30 05:29:36 +00:00
|
|
|
l = self.d.last_log()
|
2012-07-30 01:52:40 +00:00
|
|
|
assert l["type"] == "error"
|
|
|
|
assert "Invalid headers" in l["msg"]
|
|
|
|
|
2012-07-22 11:37:46 +00:00
|
|
|
def test_access_denied(self):
|
|
|
|
rsp = self.get("=nonexistent")
|
|
|
|
assert rsp.status_code == 800
|
|
|
|
|
2012-07-23 05:53:17 +00:00
|
|
|
def test_source_access_denied(self):
|
|
|
|
rsp = self.get("200:b</foo")
|
|
|
|
assert rsp.status_code == 800
|
2012-10-27 23:56:08 +00:00
|
|
|
assert "File access denied" in rsp.content
|
2012-07-23 05:53:17 +00:00
|
|
|
|
2013-01-05 04:06:41 +00:00
|
|
|
def test_proxy(self):
|
2013-02-26 20:07:16 +00:00
|
|
|
r = self.pathoc(r"get:'http://foo.com/p/202':da")
|
|
|
|
assert r.status_code == 202
|
2013-01-05 04:06:41 +00:00
|
|
|
|
2012-07-20 01:21:33 +00:00
|
|
|
|
2012-07-23 04:18:47 +00:00
|
|
|
class TestDaemon(CommonTests):
|
2012-07-23 07:55:33 +00:00
|
|
|
ssl = False
|
2013-01-05 07:29:46 +00:00
|
|
|
def test_connect(self):
|
2014-03-02 00:45:35 +00:00
|
|
|
r = self.pathoc(
|
|
|
|
r"get:'http://foo.com/p/202':da",
|
|
|
|
connect_to=("localhost", self.d.port),
|
|
|
|
ssl=True
|
|
|
|
)
|
2013-02-26 20:07:16 +00:00
|
|
|
assert r.status_code == 202
|
2013-01-05 07:29:46 +00:00
|
|
|
|
|
|
|
def test_connect_err(self):
|
2014-03-02 00:45:35 +00:00
|
|
|
tutils.raises(
|
|
|
|
http.HttpError,
|
|
|
|
self.pathoc,
|
|
|
|
r"get:'http://foo.com/p/202':da",
|
|
|
|
connect_to=("localhost", self.d.port)
|
|
|
|
)
|
2012-07-20 01:21:33 +00:00
|
|
|
|
|
|
|
|
2012-07-23 04:18:47 +00:00
|
|
|
class TestDaemonSSL(CommonTests):
|
2012-07-23 07:55:33 +00:00
|
|
|
ssl = True
|
2012-07-20 01:21:33 +00:00
|
|
|
def test_ssl_conn_failure(self):
|
2014-01-28 18:28:20 +00:00
|
|
|
c = tcp.TCPClient(("localhost", self.d.port))
|
2012-07-20 01:21:33 +00:00
|
|
|
c.rbufsize = 0
|
|
|
|
c.wbufsize = 0
|
|
|
|
c.connect()
|
|
|
|
try:
|
|
|
|
while 1:
|
|
|
|
c.wfile.write("\r\n\r\n\r\n")
|
|
|
|
except:
|
|
|
|
pass
|
2012-07-30 05:29:36 +00:00
|
|
|
l = self.d.last_log()
|
2012-07-20 01:21:33 +00:00
|
|
|
assert l["type"] == "error"
|
|
|
|
assert "SSL" in l["msg"]
|
|
|
|
|