Extend test suite to cover SSL. Log SSL connection errors.

This commit is contained in:
Aldo Cortesi 2012-07-20 13:21:33 +12:00
parent 76f0c3ea78
commit 03f4dcc02b
2 changed files with 50 additions and 14 deletions

View File

@ -26,7 +26,14 @@ class PathodHandler(tcp.BaseHandler):
self.server.ssloptions["keyfile"], self.server.ssloptions["keyfile"],
) )
except tcp.NetLibError, v: except tcp.NetLibError, v:
self.info(v) s = str(v)
self.server.add_log(
dict(
type = "error",
msg = s
)
)
self.info(s)
self.finish() self.finish()
while not self.finished: while not self.finished:

View File

@ -40,12 +40,13 @@ class TestPathod:
assert len(p.get_log()) <= p.LOGBUF assert len(p.get_log()) <= p.LOGBUF
class TestDaemon: class _DaemonTests:
@classmethod @classmethod
def setUpAll(self): def setUpAll(self):
self.d = test.Daemon( self.d = test.Daemon(
staticdir=tutils.test_data.path("data"), staticdir=tutils.test_data.path("data"),
anchors=[("/anchor/.*", "202")] anchors=[("/anchor/.*", "202")],
ssl = self.SSL
) )
@classmethod @classmethod
@ -56,19 +57,12 @@ class TestDaemon:
self.d.clear_log() self.d.clear_log()
def getpath(self, path): def getpath(self, path):
return requests.get("http://localhost:%s/%s"%(self.d.port, path)) scheme = "https" if self.SSL else "http"
return requests.get("%s://localhost:%s/%s"%(scheme, self.d.port, path), verify=False)
def get(self, spec): def get(self, spec):
return requests.get("http://localhost:%s/p/%s"%(self.d.port, spec)) scheme = "https" if self.SSL else "http"
return requests.get("%s://localhost:%s/p/%s"%(scheme, self.d.port, spec), verify=False)
def test_invalid_first_line(self):
c = tcp.TCPClient("localhost", self.d.port)
c.connect()
c.wfile.write("foo\n\n\n")
c.wfile.flush()
l = self.d.log()[0]
assert l["type"] == "error"
assert "foo" in l["msg"]
def test_info(self): def test_info(self):
assert tuple(self.d.info()["version"]) == version.IVERSION assert tuple(self.d.info()["version"]) == version.IVERSION
@ -96,3 +90,38 @@ class TestDaemon:
def test_anchor(self): def test_anchor(self):
rsp = self.getpath("anchor/foo") rsp = self.getpath("anchor/foo")
assert rsp.status_code == 202 assert rsp.status_code == 202
def test_invalid_first_line(self):
c = tcp.TCPClient("localhost", self.d.port)
c.connect()
if self.SSL:
c.convert_to_ssl()
c.wfile.write("foo\n\n\n")
c.wfile.flush()
l = self.d.log()[0]
assert l["type"] == "error"
assert "foo" in l["msg"]
class TestDaemon(_DaemonTests):
SSL = False
class TestDaemonSSL(_DaemonTests):
SSL = True
def test_ssl_conn_failure(self):
c = tcp.TCPClient("localhost", self.d.port)
c.rbufsize = 0
c.wbufsize = 0
c.connect()
try:
while 1:
c.wfile.write("\r\n\r\n\r\n")
except:
pass
l = self.d.log()[0]
assert l["type"] == "error"
assert "SSL" in l["msg"]