Unit test++

This commit is contained in:
Aldo Cortesi 2011-08-02 16:52:47 +12:00
parent 357502fe03
commit 1ff6a767d0
7 changed files with 89 additions and 37 deletions

View File

@ -867,6 +867,7 @@ class Options(object):
setattr(self, i, None) setattr(self, i, None)
#begin nocover
class BodyPile(urwid.Pile): class BodyPile(urwid.Pile):
def __init__(self, master): def __init__(self, master):
h = urwid.Text("Event log") h = urwid.Text("Event log")
@ -909,7 +910,6 @@ class BodyPile(urwid.Pile):
return self.focus_item.keypress( tsize, key ) return self.focus_item.keypress( tsize, key )
#begin nocover
VIEW_CONNLIST = 0 VIEW_CONNLIST = 0
VIEW_FLOW = 1 VIEW_FLOW = 1
VIEW_HELP = 2 VIEW_HELP = 2

View File

@ -159,7 +159,7 @@ class DumpMaster(flow.FlowMaster):
def _process_flow(self, f): def _process_flow(self, f):
if self.filt and not f.match(self.filt): if self.filt and not f.match(self.filt):
return return
if f.response: if f.response:
sz = utils.pretty_size(len(f.response.content)) sz = utils.pretty_size(len(f.response.content))

View File

@ -284,11 +284,16 @@ class Request(HTTPMsg):
modifications to make sure interception works properly. modifications to make sure interception works properly.
""" """
headers = self.headers.copy() headers = self.headers.copy()
utils.try_del(headers, 'proxy-connection') utils.del_all(
utils.try_del(headers, 'keep-alive') headers,
utils.try_del(headers, 'connection') [
utils.try_del(headers, 'content-length') 'proxy-connection',
utils.try_del(headers, 'transfer-encoding') 'keep-alive',
'connection',
'content-length',
'transfer-encoding'
]
)
if not 'host' in headers: if not 'host' in headers:
headers["host"] = [self.hostport()] headers["host"] = [self.hostport()]
content = self.content content = self.content
@ -426,10 +431,10 @@ class Response(HTTPMsg):
modifications to make sure interception works properly. modifications to make sure interception works properly.
""" """
headers = self.headers.copy() headers = self.headers.copy()
utils.try_del(headers, 'proxy-connection') utils.del_all(
utils.try_del(headers, 'connection') headers,
utils.try_del(headers, 'keep-alive') ['proxy-connection', 'connection', 'keep-alive', 'transfer-encoding']
utils.try_del(headers, 'transfer-encoding') )
content = self.content content = self.content
if content is not None: if content is not None:
headers["content-length"] = [str(len(content))] headers["content-length"] = [str(len(content))]

View File

@ -145,11 +145,10 @@ def hexdump(s):
return parts return parts
def try_del(dict, key): def del_all(dict, keys):
try: for key in keys:
del dict[key] if key in dict:
except KeyError: del dict[key]
pass
class Headers: class Headers:

View File

@ -22,6 +22,7 @@ class uDumpMaster(libpry.AutoTree):
req = tutils.treq() req = tutils.treq()
req.content = content req.content = content
cc = req.client_conn cc = req.client_conn
cc.connection_error = "error"
resp = tutils.tresp(req) resp = tutils.tresp(req)
resp.content = content resp.content = content
m.handle_clientconnect(cc) m.handle_clientconnect(cc)
@ -29,13 +30,22 @@ class uDumpMaster(libpry.AutoTree):
m.handle_response(resp) m.handle_response(resp)
m.handle_clientdisconnect(proxy.ClientDisconnect(cc)) m.handle_clientdisconnect(proxy.ClientDisconnect(cc))
def _dummy_cycle(self, filt, content, **options): def _dummy_cycle(self, n, filt, content, **options):
cs = StringIO() cs = StringIO()
o = dump.Options(**options) o = dump.Options(**options)
m = dump.DumpMaster(None, o, filt, outfile=cs) m = dump.DumpMaster(None, o, filt, outfile=cs)
self._cycle(m, content) for i in range(n):
self._cycle(m, content)
return cs.getvalue() return cs.getvalue()
def _flowfile(self, path):
f = open(path, "w")
fw = flow.FlowWriter(f)
t = tutils.tflow_full()
t.response = tutils.tresp(t.request)
fw.add(t)
f.close()
def test_replay(self): def test_replay(self):
cs = StringIO() cs = StringIO()
@ -44,12 +54,7 @@ class uDumpMaster(libpry.AutoTree):
t = self.tmpdir() t = self.tmpdir()
p = os.path.join(t, "rep") p = os.path.join(t, "rep")
f = open(p, "w") self._flowfile(p)
fw = flow.FlowWriter(f)
t = tutils.tflow_full()
t.response = tutils.tresp(t.request)
fw.add(t)
f.close()
o = dump.Options(server_replay=p, kill=True) o = dump.Options(server_replay=p, kill=True)
m = dump.DumpMaster(None, o, None, outfile=cs) m = dump.DumpMaster(None, o, None, outfile=cs)
@ -64,66 +69,79 @@ class uDumpMaster(libpry.AutoTree):
o = dump.Options(client_replay=p, kill=False) o = dump.Options(client_replay=p, kill=False)
m = dump.DumpMaster(None, o, None, outfile=cs) m = dump.DumpMaster(None, o, None, outfile=cs)
def test_read(self):
cs = StringIO()
t = self.tmpdir()
p = os.path.join(t, "read")
self._flowfile(p)
assert "GET" in self._dummy_cycle(0, None, "", verbosity=1, rfile=p)
libpry.raises(
dump.DumpError, self._dummy_cycle,
0, None, "", verbosity=1, rfile="/nonexistent"
)
def test_options(self): def test_options(self):
o = dump.Options(verbosity = 2) o = dump.Options(verbosity = 2)
assert o.verbosity == 2 assert o.verbosity == 2
libpry.raises(AttributeError, dump.Options, nonexistent = 2) libpry.raises(AttributeError, dump.Options, nonexistent = 2)
def test_filter(self): def test_filter(self):
assert not "GET" in self._dummy_cycle("~u foo", "", verbosity=1) assert not "GET" in self._dummy_cycle(1, "~u foo", "", verbosity=1)
def test_basic(self): def test_basic(self):
for i in (1, 2, 3): for i in (1, 2, 3):
assert "GET" in self._dummy_cycle("~s", "", verbosity=i, eventlog=True) assert "GET" in self._dummy_cycle(1, "~s", "", verbosity=i, eventlog=True)
assert "GET" in self._dummy_cycle("~s", "\x00\x00\x00", verbosity=i) assert "GET" in self._dummy_cycle(1, "~s", "\x00\x00\x00", verbosity=i)
assert "GET" in self._dummy_cycle("~s", "ascii", verbosity=i) assert "GET" in self._dummy_cycle(1, "~s", "ascii", verbosity=i)
def test_write(self): def test_write(self):
d = self.tmpdir() d = self.tmpdir()
p = os.path.join(d, "a") p = os.path.join(d, "a")
self._dummy_cycle(None, "", wfile=p, verbosity=0) self._dummy_cycle(1, None, "", wfile=p, verbosity=0)
assert len(list(flow.FlowReader(open(p)).stream())) == 1 assert len(list(flow.FlowReader(open(p)).stream())) == 1
def test_write_err(self): def test_write_err(self):
libpry.raises( libpry.raises(
dump.DumpError, dump.DumpError,
self._dummy_cycle, self._dummy_cycle,
1,
None, None,
"", "",
wfile = "nonexistentdir/foo" wfile = "nonexistentdir/foo"
) )
def test_request_script(self): def test_request_script(self):
ret = self._dummy_cycle(None, "", request_script="scripts/a", verbosity=1) ret = self._dummy_cycle(1, None, "", request_script="scripts/a", verbosity=1)
assert "TESTOK" in ret assert "TESTOK" in ret
assert "DEBUG" in ret assert "DEBUG" in ret
libpry.raises( libpry.raises(
dump.DumpError, dump.DumpError,
self._dummy_cycle, None, "", request_script="nonexistent" self._dummy_cycle, 1, None, "", request_script="nonexistent"
) )
libpry.raises( libpry.raises(
dump.DumpError, dump.DumpError,
self._dummy_cycle, None, "", request_script="scripts/err_return" self._dummy_cycle, 1, None, "", request_script="scripts/err_return"
) )
def test_response_script(self): def test_response_script(self):
ret = self._dummy_cycle(None, "", response_script="scripts/a", verbosity=1) ret = self._dummy_cycle(1, None, "", response_script="scripts/a", verbosity=1)
assert "TESTOK" in ret assert "TESTOK" in ret
assert "DEBUG" in ret assert "DEBUG" in ret
libpry.raises( libpry.raises(
dump.DumpError, dump.DumpError,
self._dummy_cycle, None, "", response_script="nonexistent" self._dummy_cycle, 1, None, "", response_script="nonexistent"
) )
libpry.raises( libpry.raises(
dump.DumpError, dump.DumpError,
self._dummy_cycle, None, "", response_script="scripts/err_return" self._dummy_cycle, 1, None, "", response_script="scripts/err_return"
) )
def test_stickycookie(self): def test_stickycookie(self):
ret = self._dummy_cycle(None, "", stickycookie = ".*") ret = self._dummy_cycle(1, None, "", stickycookie = ".*")
def test_stickyauth(self): def test_stickyauth(self):
ret = self._dummy_cycle(None, "", stickyauth = ".*") ret = self._dummy_cycle(1, None, "", stickyauth = ".*")

View File

@ -200,6 +200,22 @@ class uFlow(libpry.AutoTree):
f.kill(fm) f.kill(fm)
assert f.response.acked assert f.response.acked
def test_killall(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
r = tutils.treq()
fm.handle_request(r)
r = tutils.treq()
fm.handle_request(r)
for i in s.view:
assert not i.request.acked
s.killall(fm)
for i in s.view:
assert i.request.acked
def test_accept_intercept(self): def test_accept_intercept(self):
f = tutils.tflow() f = tutils.tflow()
f.request = tutils.treq() f.request = tutils.treq()
@ -234,6 +250,10 @@ class uFlow(libpry.AutoTree):
assert f.response.headers["bar"] == ["bar"] assert f.response.headers["bar"] == ["bar"]
assert f.response.content == "abarb" assert f.response.content == "abarb"
f = tutils.tflow_err()
f.replace("error", "bar")
assert f.error.msg == "bar"
class uState(libpry.AutoTree): class uState(libpry.AutoTree):
def test_backup(self): def test_backup(self):
@ -294,6 +314,7 @@ class uState(libpry.AutoTree):
e = proxy.Error(tutils.tflow().request, "message") e = proxy.Error(tutils.tflow().request, "message")
assert not c.add_error(e) assert not c.add_error(e)
def test_set_limit(self): def test_set_limit(self):
c = flow.State() c = flow.State()
@ -304,6 +325,7 @@ class uState(libpry.AutoTree):
assert len(c.view) == 1 assert len(c.view) == 1
c.set_limit("~s") c.set_limit("~s")
assert c.limit_txt == "~s"
assert len(c.view) == 0 assert len(c.view) == 0
resp = tutils.tresp(req) resp = tutils.tresp(req)
c.add_response(resp) c.add_response(resp)
@ -447,6 +469,7 @@ class uFlowMaster(libpry.AutoTree):
assert not fm.handle_response(rx) assert not fm.handle_response(rx)
dc = proxy.ClientDisconnect(req.client_conn) dc = proxy.ClientDisconnect(req.client_conn)
fm.handle_clientdisconnect(dc)
err = proxy.Error(f.request, "msg") err = proxy.Error(f.request, "msg")
fm.handle_error(err) fm.handle_error(err)

View File

@ -29,6 +29,12 @@ class uhexdump(libpry.AutoTree):
def test_simple(self): def test_simple(self):
assert utils.hexdump("one\0"*10) assert utils.hexdump("one\0"*10)
class udel_all(libpry.AutoTree):
def test_simple(self):
d = dict(a=1, b=2, c=3)
utils.del_all(d, ["a", "x", "b"])
assert d.keys() == ["c"]
class upretty_size(libpry.AutoTree): class upretty_size(libpry.AutoTree):
def test_simple(self): def test_simple(self):
@ -300,6 +306,7 @@ tests = [
upretty_xmlish(), upretty_xmlish(),
upretty_json(), upretty_json(),
u_urldecode(), u_urldecode(),
udel_all(),
udummy_ca(), udummy_ca(),
udummy_cert(), udummy_cert(),
uLRUCache(), uLRUCache(),