From b7b357528c3d3867f1a29400de3a11f2d976e60d Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Sat, 9 Jun 2012 13:42:43 +1200 Subject: [PATCH] Port mitmproxy test suite entirely to nose. --- .coveragerc | 2 + .gitignore | 1 + libmproxy/flow.py | 8 +- libmproxy/wsgi.py | 6 +- test/test_certutils.py | 74 ++++++------- test/test_cmdline.py | 150 +++++++++++++------------- test/test_console.py | 55 ++++------ test/test_console_contentview.py | 24 +++-- test/test_console_help.py | 8 +- test/test_dump.py | 87 +++++++-------- test/test_encoding.py | 35 +++--- test/test_filt.py | 13 +-- test/test_flow.py | 176 ++++++++++++++----------------- test/test_proxy.py | 170 +++++++++++++---------------- test/test_script.py | 27 ++--- test/test_wsgi.py | 11 +- test/tutils.py | 21 +++- 17 files changed, 388 insertions(+), 480 deletions(-) create mode 100644 .coveragerc diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 000000000..7ba22a38f --- /dev/null +++ b/.coveragerc @@ -0,0 +1,2 @@ +[report] +omit = *contrib* diff --git a/.gitignore b/.gitignore index dceac379c..78b1cdb59 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ mitmproxyc mitmdumpc mitmplaybackc mitmrecordc +.coverage diff --git a/libmproxy/flow.py b/libmproxy/flow.py index 6b3d868ed..bb9d34f86 100644 --- a/libmproxy/flow.py +++ b/libmproxy/flow.py @@ -903,9 +903,7 @@ class ClientPlaybackState: n.request.client_conn = None self.current = master.handle_request(n.request) if not testing and not self.current.response: - #begin nocover - master.replay_request(self.current) - #end nocover + master.replay_request(self.current) # pragma: no cover elif self.current.response: master.handle_response(self.current.response) @@ -1498,7 +1496,6 @@ class FlowMaster(controller.Master): return "Can't replay while intercepting..." if f.request.content == CONTENT_MISSING: return "Can't replay request with missing content..." - #begin nocover if f.request: f.request._set_replay() if f.request.content: @@ -1511,8 +1508,7 @@ class FlowMaster(controller.Master): f, self.masterq, ) - rt.start() - #end nocover + rt.start() # pragma: no cover def run_script_hook(self, name, *args, **kwargs): if self.script and not self.pause_scripts: diff --git a/libmproxy/wsgi.py b/libmproxy/wsgi.py index ce0367fb9..b4555312d 100644 --- a/libmproxy/wsgi.py +++ b/libmproxy/wsgi.py @@ -118,10 +118,8 @@ class WSGIAdaptor: try: s = traceback.format_exc() self.error_page(soc, state["headers_sent"], s) - # begin nocover - except Exception, v: - pass - # end nocover + except Exception, v: # pragma: no cover + pass # pragma: no cover return errs.getvalue() diff --git a/test/test_certutils.py b/test/test_certutils.py index 9b89599a1..1fd6cbb20 100644 --- a/test/test_certutils.py +++ b/test/test_certutils.py @@ -1,11 +1,10 @@ import os -import libpry from libmproxy import certutils +import tutils -class udummy_ca(libpry.AutoTree): - def test_all(self): - d = self.tmpdir() +def test_dummy_ca(): + with tutils.tmpdir() as d: path = os.path.join(d, "foo/cert.cnf") assert certutils.dummy_ca(path) assert os.path.exists(path) @@ -17,45 +16,45 @@ class udummy_ca(libpry.AutoTree): assert os.path.exists(os.path.join(d, "foo/cert2-cert.p12")) -class udummy_cert(libpry.AutoTree): +class TestDummyCert: def test_with_ca(self): - d = self.tmpdir() - cacert = os.path.join(d, "foo/cert.cnf") - assert certutils.dummy_ca(cacert) - p = certutils.dummy_cert( - os.path.join(d, "foo"), - cacert, - "foo.com", - ["one.com", "two.com", "*.three.com"] - ) - assert os.path.exists(p) + with tutils.tmpdir() as d: + cacert = os.path.join(d, "foo/cert.cnf") + assert certutils.dummy_ca(cacert) + p = certutils.dummy_cert( + os.path.join(d, "foo"), + cacert, + "foo.com", + ["one.com", "two.com", "*.three.com"] + ) + assert os.path.exists(p) - # Short-circuit - assert certutils.dummy_cert( - os.path.join(d, "foo"), - cacert, - "foo.com", - [] - ) + # Short-circuit + assert certutils.dummy_cert( + os.path.join(d, "foo"), + cacert, + "foo.com", + [] + ) def test_no_ca(self): - d = self.tmpdir() - p = certutils.dummy_cert( - d, - None, - "foo.com", - [] - ) - assert os.path.exists(p) + with tutils.tmpdir() as d: + p = certutils.dummy_cert( + d, + None, + "foo.com", + [] + ) + assert os.path.exists(p) -class uSSLCert(libpry.AutoTree): +class TestSSLCert: def test_simple(self): - c = certutils.SSLCert(file("data/text_cert", "r").read()) + c = certutils.SSLCert(file(tutils.test_data.path("data/text_cert"), "r").read()) assert c.cn == "google.com" assert len(c.altnames) == 436 - c = certutils.SSLCert(file("data/text_cert_2", "r").read()) + c = certutils.SSLCert(file(tutils.test_data.path("data/text_cert_2"), "r").read()) assert c.cn == "www.inode.co.nz" assert len(c.altnames) == 2 assert c.digest("sha1") @@ -68,13 +67,6 @@ class uSSLCert(libpry.AutoTree): c.has_expired def test_der(self): - d = file("data/dercert").read() + d = file(tutils.test_data.path("data/dercert")).read() s = certutils.SSLCert.from_der(d) assert s.cn - - -tests = [ - udummy_ca(), - udummy_cert(), - uSSLCert(), -] diff --git a/test/test_cmdline.py b/test/test_cmdline.py index 03f230a69..45c0b3e6b 100644 --- a/test/test_cmdline.py +++ b/test/test_cmdline.py @@ -1,95 +1,89 @@ import optparse -import libpry from libmproxy import cmdline +import tutils -class uAll(libpry.AutoTree): - def test_parse_replace_hook(self): - x = cmdline.parse_replace_hook("/foo/bar/voing") - assert x == ("foo", "bar", "voing") +def test_parse_replace_hook(): + x = cmdline.parse_replace_hook("/foo/bar/voing") + assert x == ("foo", "bar", "voing") - x = cmdline.parse_replace_hook("/foo/bar/vo/ing/") - assert x == ("foo", "bar", "vo/ing/") + x = cmdline.parse_replace_hook("/foo/bar/vo/ing/") + assert x == ("foo", "bar", "vo/ing/") - x = cmdline.parse_replace_hook("/bar/voing") - assert x == (".*", "bar", "voing") + x = cmdline.parse_replace_hook("/bar/voing") + assert x == (".*", "bar", "voing") - libpry.raises( - cmdline.ParseReplaceException, - cmdline.parse_replace_hook, - "/foo" - ) - libpry.raises( - "replacement regex", - cmdline.parse_replace_hook, - "patt/[/rep" - ) - libpry.raises( - "filter pattern", - cmdline.parse_replace_hook, - "/~/foo/rep" - ) - libpry.raises( - "empty replacement regex", - cmdline.parse_replace_hook, - "//" - ) + tutils.raises( + cmdline.ParseReplaceException, + cmdline.parse_replace_hook, + "/foo" + ) + tutils.raises( + "replacement regex", + cmdline.parse_replace_hook, + "patt/[/rep" + ) + tutils.raises( + "filter pattern", + cmdline.parse_replace_hook, + "/~/foo/rep" + ) + tutils.raises( + "empty replacement regex", + cmdline.parse_replace_hook, + "//" + ) - def test_common(self): - parser = optparse.OptionParser() - cmdline.common_options(parser) - opts, args = parser.parse_args(args=[]) +def test_common(): + parser = optparse.OptionParser() + cmdline.common_options(parser) + opts, args = parser.parse_args(args=[]) - assert cmdline.get_common_options(opts) + assert cmdline.get_common_options(opts) - opts.stickycookie_all = True - opts.stickyauth_all = True - v = cmdline.get_common_options(opts) - assert v["stickycookie"] == ".*" - assert v["stickyauth"] == ".*" + opts.stickycookie_all = True + opts.stickyauth_all = True + v = cmdline.get_common_options(opts) + assert v["stickycookie"] == ".*" + assert v["stickyauth"] == ".*" - opts.stickycookie_all = False - opts.stickyauth_all = False - opts.stickycookie_filt = "foo" - opts.stickyauth_filt = "foo" - v = cmdline.get_common_options(opts) - assert v["stickycookie"] == "foo" - assert v["stickyauth"] == "foo" + opts.stickycookie_all = False + opts.stickyauth_all = False + opts.stickycookie_filt = "foo" + opts.stickyauth_filt = "foo" + v = cmdline.get_common_options(opts) + assert v["stickycookie"] == "foo" + assert v["stickyauth"] == "foo" - opts.replace = ["/foo/bar/voing"] - v = cmdline.get_common_options(opts) - assert v["replacements"] == [("foo", "bar", "voing")] + opts.replace = ["/foo/bar/voing"] + v = cmdline.get_common_options(opts) + assert v["replacements"] == [("foo", "bar", "voing")] - opts.replace = ["//"] - libpry.raises( - "empty replacement regex", - cmdline.get_common_options, - opts - ) + opts.replace = ["//"] + tutils.raises( + "empty replacement regex", + cmdline.get_common_options, + opts + ) - opts.replace = [] - opts.replace_file = [("/foo/bar/nonexistent")] - libpry.raises( - "could not read replace file", - cmdline.get_common_options, - opts - ) + opts.replace = [] + opts.replace_file = [("/foo/bar/nonexistent")] + tutils.raises( + "could not read replace file", + cmdline.get_common_options, + opts + ) - opts.replace_file = [("/~/bar/nonexistent")] - libpry.raises( - "filter pattern", - cmdline.get_common_options, - opts - ) + opts.replace_file = [("/~/bar/nonexistent")] + tutils.raises( + "filter pattern", + cmdline.get_common_options, + opts + ) - opts.replace_file = [("/foo/bar/./data/replace")] - v = cmdline.get_common_options(opts)["replacements"] - assert len(v) == 1 - assert v[0][2].strip() == "replacecontents" - - - -tests = [ - uAll() -] + p = tutils.test_data.path("data/replace") + opts.replace_file = [("/foo/bar/%s"%p)] + v = cmdline.get_common_options(opts)["replacements"] + assert len(v) == 1 + assert v[0][2].strip() == "replacecontents" diff --git a/test/test_console.py b/test/test_console.py index a570f3e5d..498a93bd4 100644 --- a/test/test_console.py +++ b/test/test_console.py @@ -1,10 +1,9 @@ +import os from libmproxy import console from libmproxy.console import common import tutils -import libpry - -class uConsoleState(libpry.AutoTree): +class TestConsoleState: def test_flow(self): """ normal flow: @@ -89,32 +88,34 @@ class uConsoleState(libpry.AutoTree): assert len(c.flowsettings) == 0 -class uformat_keyvals(libpry.AutoTree): - def test_simple(self): - assert common.format_keyvals( - [ - ("aa", "bb"), - None, - ("cc", "dd"), - (None, "dd"), - (None, "dd"), - ] - ) +def test_format_keyvals(): + assert common.format_keyvals( + [ + ("aa", "bb"), + None, + ("cc", "dd"), + (None, "dd"), + (None, "dd"), + ] + ) -class uPathCompleter(libpry.AutoTree): +class TestPathCompleter: def test_lookup_construction(self): c = console._PathCompleter() assert c.complete("/tm") == "/tmp/" c.reset() - assert c.complete("./completion/a") == "./completion/aaa" - assert c.complete("./completion/a") == "./completion/aab" + cd = tutils.test_data.path("completion") + ca = os.path.join(cd, "a") + assert c.complete(ca).endswith("/completion/aaa") + assert c.complete(ca).endswith("/completion/aab") c.reset() - assert c.complete("./completion/aaa") == "./completion/aaa" - assert c.complete("./completion/aaa") == "./completion/aaa" + ca = os.path.join(cd, "aaa") + assert c.complete(ca).endswith("/completion/aaa") + assert c.complete(ca).endswith("/completion/aaa") c.reset() - assert c.complete("./completion") == "./completion/aaa" + assert c.complete(cd).endswith("/completion/aaa") def test_completion(self): c = console._PathCompleter(True) @@ -144,15 +145,5 @@ class uPathCompleter(libpry.AutoTree): assert c.final == s -class uOptions(libpry.AutoTree): - def test_all(self): - assert console.Options(kill=True) - - - -tests = [ - uformat_keyvals(), - uConsoleState(), - uPathCompleter(), - uOptions() -] +def test_options(): + assert console.Options(kill=True) diff --git a/test/test_console_contentview.py b/test/test_console_contentview.py index bd91d2eba..fbb7e6d24 100644 --- a/test/test_console_contentview.py +++ b/test/test_console_contentview.py @@ -1,9 +1,9 @@ import sys -import libpry import libmproxy.console.contentview as cv from libmproxy import utils, flow, encoding +import tutils -class uContentView(libpry.AutoTree): +class TestContentView: def test_trailer(self): txt = [] cv.trailer(5, txt, 1000) @@ -97,10 +97,18 @@ class uContentView(libpry.AutoTree): assert cv.view_hex([], "foo", 1000) def test_view_image(self): - assert cv.view_image([], file("data/image.png").read(), sys.maxint) - assert cv.view_image([], file("data/image.gif").read(), sys.maxint) - assert cv.view_image([], file("data/image-err1.jpg").read(), sys.maxint) - assert cv.view_image([], file("data/image.ico").read(), sys.maxint) + p = tutils.test_data.path("data/image.png") + assert cv.view_image([], file(p).read(), sys.maxint) + + p = tutils.test_data.path("data/image.gif") + assert cv.view_image([], file(p).read(), sys.maxint) + + p = tutils.test_data.path("data/image-err1.jpg") + assert cv.view_image([], file(p).read(), sys.maxint) + + p = tutils.test_data.path("data/image.ico") + assert cv.view_image([], file(p).read(), sys.maxint) + assert not cv.view_image([], "flibble", sys.maxint) def test_view_multipart(self): @@ -178,7 +186,3 @@ Larry assert "decoded gzip" in r[0] assert "Raw" in r[0] - -tests = [ - uContentView() -] diff --git a/test/test_console_help.py b/test/test_console_help.py index e747635f3..c9ef72282 100644 --- a/test/test_console_help.py +++ b/test/test_console_help.py @@ -1,5 +1,4 @@ import sys -import libpry import libmproxy.console.help as help from libmproxy import utils, flow, encoding @@ -8,7 +7,7 @@ class DummyMaster: pass -class uHelp(libpry.AutoTree): +class TestHelp: def test_helptext(self): h = help.HelpView(None, "foo", None) assert h.helptext() @@ -18,8 +17,3 @@ class uHelp(libpry.AutoTree): assert not h.keypress((0, 0), "q") assert not h.keypress((0, 0), "?") assert h.keypress((0, 0), "o") == "o" - - -tests = [ - uHelp() -] diff --git a/test/test_dump.py b/test/test_dump.py index b2b5d8cdd..621e76e75 100644 --- a/test/test_dump.py +++ b/test/test_dump.py @@ -4,20 +4,19 @@ import libpry from libmproxy import dump, flow import tutils -class uStrFuncs(libpry.AutoTree): - def test_all(self): - t = tutils.tresp() - t._set_replay() - dump.str_response(t) +def test_strfuncs(): + t = tutils.tresp() + t._set_replay() + dump.str_response(t) - t = tutils.treq() - t.client_conn = None - t.stickycookie = True - assert "stickycookie" in dump.str_request(t) - assert "replay" in dump.str_request(t) + t = tutils.treq() + t.client_conn = None + t.stickycookie = True + assert "stickycookie" in dump.str_request(t) + assert "replay" in dump.str_request(t) -class uDumpMaster(libpry.AutoTree): +class TestDumpMaster: def _cycle(self, m, content): req = tutils.treq() req.content = content @@ -53,38 +52,38 @@ class uDumpMaster(libpry.AutoTree): o = dump.Options(server_replay="nonexistent", kill=True) libpry.raises(dump.DumpError, dump.DumpMaster, None, o, None, outfile=cs) - t = self.tmpdir() - p = os.path.join(t, "rep") - self._flowfile(p) + with tutils.tmpdir() as t: + p = os.path.join(t, "rep") + self._flowfile(p) - o = dump.Options(server_replay=p, kill=True) - m = dump.DumpMaster(None, o, None, outfile=cs) + o = dump.Options(server_replay=p, kill=True) + m = dump.DumpMaster(None, o, None, outfile=cs) - self._cycle(m, "content") - self._cycle(m, "content") + self._cycle(m, "content") + self._cycle(m, "content") - o = dump.Options(server_replay=p, kill=False) - m = dump.DumpMaster(None, o, None, outfile=cs) - self._cycle(m, "nonexistent") + o = dump.Options(server_replay=p, kill=False) + m = dump.DumpMaster(None, o, None, outfile=cs) + self._cycle(m, "nonexistent") - o = dump.Options(client_replay=p, kill=False) - m = dump.DumpMaster(None, o, None, outfile=cs) + o = dump.Options(client_replay=p, kill=False) + m = dump.DumpMaster(None, o, None, outfile=cs) def test_read(self): - t = self.tmpdir() - p = os.path.join(t, "read") - self._flowfile(p) - assert "GET" in self._dummy_cycle(0, None, "", verbosity=1, rfile=p) + with tutils.tmpdir() as t: + p = os.path.join(t, "read") + self._flowfile(p) + assert "GET" in self._dummy_cycle(0, None, "", verbosity=1, rfile=p) - libpry.raises( - dump.DumpError, self._dummy_cycle, - 0, None, "", verbosity=1, rfile="/nonexistent" - ) + libpry.raises( + dump.DumpError, self._dummy_cycle, + 0, None, "", verbosity=1, rfile="/nonexistent" + ) - libpry.raises( - dump.DumpError, self._dummy_cycle, - 0, None, "", verbosity=1, rfile="test_dump.py" - ) + libpry.raises( + dump.DumpError, self._dummy_cycle, + 0, None, "", verbosity=1, rfile="test_dump.py" + ) def test_options(self): o = dump.Options(verbosity = 2) @@ -107,10 +106,10 @@ class uDumpMaster(libpry.AutoTree): assert "GET" in self._dummy_cycle(1, "~s", "ascii", verbosity=i) def test_write(self): - d = self.tmpdir() - p = os.path.join(d, "a") - self._dummy_cycle(1, None, "", wfile=p, verbosity=0) - assert len(list(flow.FlowReader(open(p)).stream())) == 1 + with tutils.tmpdir() as d: + p = os.path.join(d, "a") + self._dummy_cycle(1, None, "", wfile=p, verbosity=0) + assert len(list(flow.FlowReader(open(p)).stream())) == 1 def test_write_err(self): libpry.raises( @@ -125,7 +124,7 @@ class uDumpMaster(libpry.AutoTree): def test_script(self): ret = self._dummy_cycle( 1, None, "", - script="scripts/all.py", verbosity=0, eventlog=True + script=tutils.test_data.path("scripts/all.py"), verbosity=0, eventlog=True ) assert "XCLIENTCONNECT" in ret assert "XREQUEST" in ret @@ -146,11 +145,3 @@ class uDumpMaster(libpry.AutoTree): def test_stickyauth(self): self._dummy_cycle(1, None, "", stickyauth = ".*") - - - - -tests = [ - uStrFuncs(), - uDumpMaster() -] diff --git a/test/test_encoding.py b/test/test_encoding.py index 61caf507b..732447e2f 100644 --- a/test/test_encoding.py +++ b/test/test_encoding.py @@ -1,28 +1,19 @@ from libmproxy import encoding -import libpry -class uidentity(libpry.AutoTree): - def test_simple(self): - assert "string" == encoding.decode("identity", "string") - assert "string" == encoding.encode("identity", "string") - assert not encoding.encode("nonexistent", "string") +def test_identity(): + assert "string" == encoding.decode("identity", "string") + assert "string" == encoding.encode("identity", "string") + assert not encoding.encode("nonexistent", "string") + assert None == encoding.decode("nonexistent encoding", "string") - def test_fallthrough(self): - assert None == encoding.decode("nonexistent encoding", "string") -class ugzip(libpry.AutoTree): - def test_simple(self): - assert "string" == encoding.decode("gzip", encoding.encode("gzip", "string")) - assert None == encoding.decode("gzip", "bogus") +def test_gzip(): + assert "string" == encoding.decode("gzip", encoding.encode("gzip", "string")) + assert None == encoding.decode("gzip", "bogus") -class udeflate(libpry.AutoTree): - def test_simple(self): - assert "string" == encoding.decode("deflate", encoding.encode("deflate", "string")) - assert "string" == encoding.decode("deflate", encoding.encode("deflate", "string")[2:-4]) - assert None == encoding.decode("deflate", "bogus") -tests = [ - uidentity(), - ugzip(), - udeflate() -] +def test_deflate(): + assert "string" == encoding.decode("deflate", encoding.encode("deflate", "string")) + assert "string" == encoding.decode("deflate", encoding.encode("deflate", "string")[2:-4]) + assert None == encoding.decode("deflate", "bogus") + diff --git a/test/test_filt.py b/test/test_filt.py index 8e349d7c0..287ef376c 100644 --- a/test/test_filt.py +++ b/test/test_filt.py @@ -1,9 +1,8 @@ import cStringIO from libmproxy import filt, flow -import libpry -class uParsing(libpry.AutoTree): +class TestParsing: def _dump(self, x): c = cStringIO.StringIO() x.dump(fp=c) @@ -71,7 +70,7 @@ class uParsing(libpry.AutoTree): self._dump(a) -class uMatching(libpry.AutoTree): +class TestMatching: def req(self): conn = flow.ClientConnect(("one", 2222)) headers = flow.ODictCaseless() @@ -235,11 +234,3 @@ class uMatching(libpry.AutoTree): assert self.q("!~c 201 !~c 202", s) assert not self.q("!~c 201 !~c 200", s) - - - - -tests = [ - uMatching(), - uParsing() -] diff --git a/test/test_flow.py b/test/test_flow.py index 0b7fee4f5..994264e4c 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -6,7 +6,7 @@ import tutils import libpry -class uStickyCookieState(libpry.AutoTree): +class TestStickyCookieState: def _response(self, cookie, host): s = flow.StickyCookieState(filt.parse(".*")) f = tutils.tflow_full() @@ -40,7 +40,7 @@ class uStickyCookieState(libpry.AutoTree): assert "cookie" in f.request.headers -class uStickyAuthState(libpry.AutoTree): +class TestStickyAuthState: def test_handle_response(self): s = flow.StickyAuthState(filt.parse(".*")) f = tutils.tflow_full() @@ -53,7 +53,7 @@ class uStickyAuthState(libpry.AutoTree): assert f.request.headers["authorization"] == ["foo"] -class uClientPlaybackState(libpry.AutoTree): +class TestClientPlaybackState: def test_tick(self): first = tutils.tflow() s = flow.State() @@ -85,7 +85,7 @@ class uClientPlaybackState(libpry.AutoTree): assert not fm.client_playback -class uServerPlaybackState(libpry.AutoTree): +class TestServerPlaybackState: def test_hash(self): s = flow.ServerPlaybackState(None, [], False, False) r = tutils.tflow() @@ -147,7 +147,8 @@ class uServerPlaybackState(libpry.AutoTree): n = s.next_flow(r) assert s.count() == 2 -class uFlow(libpry.AutoTree): + +class TestFlow: def test_copy(self): f = tutils.tflow_full() f2 = f.copy() @@ -302,7 +303,7 @@ class uFlow(libpry.AutoTree): -class uState(libpry.AutoTree): +class TestState: def test_backup(self): c = flow.State() req = tutils.treq() @@ -451,7 +452,7 @@ class uState(libpry.AutoTree): c.accept_all() -class uSerialize(libpry.AutoTree): +class TestSerialize: def _treader(self): sio = StringIO() w = flow.FlowWriter(sio) @@ -494,7 +495,7 @@ class uSerialize(libpry.AutoTree): sio.write("bogus") sio.seek(0) r = flow.FlowReader(sio) - libpry.raises(flow.FlowReadError, list, r.stream()) + tutils.raises(flow.FlowReadError, list, r.stream()) f = flow.FlowReadError("foo") assert f.strerror == "foo" @@ -508,18 +509,18 @@ class uSerialize(libpry.AutoTree): sio.seek(0) r = flow.FlowReader(sio) - libpry.raises("version", list, r.stream()) + tutils.raises("version", list, r.stream()) -class uFlowMaster(libpry.AutoTree): +class TestFlowMaster: def test_load_script(self): s = flow.State() fm = flow.FlowMaster(None, s) - assert not fm.load_script("scripts/a.py") - assert not fm.load_script("scripts/a.py") + assert not fm.load_script(tutils.test_data.path("scripts/a.py")) + assert not fm.load_script(tutils.test_data.path("scripts/a.py")) assert not fm.load_script(None) assert fm.load_script("nonexistent") - assert "ValueError" in fm.load_script("scripts/starterr.py") + assert "ValueError" in fm.load_script(tutils.test_data.path("scripts/starterr.py")) def test_replay(self): s = flow.State() @@ -534,7 +535,7 @@ class uFlowMaster(libpry.AutoTree): def test_script_reqerr(self): s = flow.State() fm = flow.FlowMaster(None, s) - assert not fm.load_script("scripts/reqerr.py") + assert not fm.load_script(tutils.test_data.path("scripts/reqerr.py")) req = tutils.treq() fm.handle_clientconnect(req.client_conn) assert fm.handle_request(req) @@ -542,7 +543,7 @@ class uFlowMaster(libpry.AutoTree): def test_script(self): s = flow.State() fm = flow.FlowMaster(None, s) - assert not fm.load_script("scripts/all.py") + assert not fm.load_script(tutils.test_data.path("scripts/all.py")) req = tutils.treq() fm.handle_clientconnect(req.client_conn) assert fm.script.ns["log"][-1] == "clientconnect" @@ -680,7 +681,7 @@ class uFlowMaster(libpry.AutoTree): fm.handle_request(f.request) assert f.request.headers["authorization"] == ["foo"] -class uRequest(libpry.AutoTree): +class TestRequest: def test_simple(self): h = flow.ODictCaseless() h["test"] = ["test"] @@ -819,7 +820,7 @@ class uRequest(libpry.AutoTree): assert r.content == "falafel" -class uResponse(libpry.AutoTree): +class TestResponse: def test_simple(self): h = flow.ODictCaseless() h["test"] = ["test"] @@ -869,7 +870,10 @@ class uResponse(libpry.AutoTree): def test_get_cert(self): req = tutils.treq() - resp = flow.Response(req, 200, "msg", flow.ODictCaseless(), "content", file("data/dercert").read()) + resp = flow.Response( + req, 200, "msg", flow.ODictCaseless(), "content", + file(tutils.test_data.path("data/dercert")).read() + ) assert resp.get_cert() resp = tutils.tresp() @@ -924,7 +928,7 @@ class uResponse(libpry.AutoTree): assert r.content == "falafel" -class uError(libpry.AutoTree): +class TestError: def test_getset_state(self): e = flow.Error(None, "Error") state = e._get_state() @@ -947,7 +951,7 @@ class uError(libpry.AutoTree): assert e.msg == "abarp" -class uClientConnect(libpry.AutoTree): +class TestClientConnect: def test_state(self): c = flow.ClientConnect(("a", 22)) assert flow.ClientConnect._from_state(c._get_state()) == c @@ -963,13 +967,13 @@ class uClientConnect(libpry.AutoTree): assert c3 == c -class uODict(libpry.AutoTree): +class TestODict: def setUp(self): self.od = flow.ODict() def test_str_err(self): h = flow.ODict() - libpry.raises(ValueError, h.__setitem__, "key", "foo") + tutils.raises(ValueError, h.__setitem__, "key", "foo") def test_dictToHeader1(self): self.od.add("one", "uno") @@ -1047,7 +1051,7 @@ class uODict(libpry.AutoTree): assert self.od.get("two") == None -class uODictCaseless(libpry.AutoTree): +class TestODictCaseless: def setUp(self): self.od = flow.ODictCaseless() @@ -1068,89 +1072,67 @@ class uODictCaseless(libpry.AutoTree): assert len(self.od) == 1 -class udecoded(libpry.AutoTree): - def test_del(self): - r = tutils.treq() - assert r.content == "content" +def test_decoded(): + r = tutils.treq() + assert r.content == "content" + assert not r.headers["content-encoding"] + r.encode("gzip") + assert r.headers["content-encoding"] + assert r.content != "content" + with flow.decoded(r): assert not r.headers["content-encoding"] - r.encode("gzip") - assert r.headers["content-encoding"] - assert r.content != "content" - with flow.decoded(r): - assert not r.headers["content-encoding"] - assert r.content == "content" - assert r.headers["content-encoding"] - assert r.content != "content" + assert r.content == "content" + assert r.headers["content-encoding"] + assert r.content != "content" - with flow.decoded(r): - r.content = "foo" + with flow.decoded(r): + r.content = "foo" - assert r.content != "foo" - r.decode() - assert r.content == "foo" + assert r.content != "foo" + r.decode() + assert r.content == "foo" -class uReplaceHooks(libpry.AutoTree): - def test_add_remove(self): - h = flow.ReplaceHooks() - h.add("~q", "foo", "bar") - assert h.lst - h.remove("~q", "foo", "bar") - assert not h.lst +def test_replacehooks(): + h = flow.ReplaceHooks() + h.add("~q", "foo", "bar") + assert h.lst + h.remove("~q", "foo", "bar") + assert not h.lst - h.add("~q", "foo", "bar") - h.add("~s", "foo", "bar") + h.add("~q", "foo", "bar") + h.add("~s", "foo", "bar") - v = h.get_specs() - assert v == [('~q', 'foo', 'bar'), ('~s', 'foo', 'bar')] - assert h.count() == 2 - h.remove("~q", "foo", "bar") - assert h.count() == 1 - h.remove("~q", "foo", "bar") - assert h.count() == 1 - h.clear() - assert h.count() == 0 + v = h.get_specs() + assert v == [('~q', 'foo', 'bar'), ('~s', 'foo', 'bar')] + assert h.count() == 2 + h.remove("~q", "foo", "bar") + assert h.count() == 1 + h.remove("~q", "foo", "bar") + assert h.count() == 1 + h.clear() + assert h.count() == 0 - f = tutils.tflow() - f.request.content = "foo" - h.add("~s", "foo", "bar") - h.run(f) - assert f.request.content == "foo" + f = tutils.tflow() + f.request.content = "foo" + h.add("~s", "foo", "bar") + h.run(f) + assert f.request.content == "foo" - f = tutils.tflow_full() - f.request.content = "foo" - f.response.content = "foo" - h.run(f) - assert f.response.content == "bar" - assert f.request.content == "foo" + f = tutils.tflow_full() + f.request.content = "foo" + f.response.content = "foo" + h.run(f) + assert f.response.content == "bar" + assert f.request.content == "foo" - f = tutils.tflow() - h.clear() - h.add("~q", "foo", "bar") - f.request.content = "foo" - h.run(f) - assert f.request.content == "bar" + f = tutils.tflow() + h.clear() + h.add("~q", "foo", "bar") + f.request.content = "foo" + h.run(f) + assert f.request.content == "bar" - assert not h.add("~", "foo", "bar") - assert not h.add("foo", "*", "bar") + assert not h.add("~", "foo", "bar") + assert not h.add("foo", "*", "bar") - - -tests = [ - uReplaceHooks(), - uStickyCookieState(), - uStickyAuthState(), - uServerPlaybackState(), - uClientPlaybackState(), - uFlow(), - uState(), - uSerialize(), - uFlowMaster(), - uRequest(), - uResponse(), - uError(), - uClientConnect(), - uODict(), - uODictCaseless(), - udecoded() -] diff --git a/test/test_proxy.py b/test/test_proxy.py index c67e93a02..a5dc666ac 100644 --- a/test/test_proxy.py +++ b/test/test_proxy.py @@ -2,60 +2,57 @@ import cStringIO, textwrap from cStringIO import StringIO import libpry from libmproxy import proxy, flow - - -class u_read_chunked(libpry.AutoTree): - def test_all(self): - s = cStringIO.StringIO("1\r\na\r\n0\r\n") - libpry.raises(IOError, proxy.read_chunked, s, None) - - s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n") - assert proxy.read_chunked(s, None) == "a" - - s = cStringIO.StringIO("\r\n") - libpry.raises(IOError, proxy.read_chunked, s, None) - - s = cStringIO.StringIO("1\r\nfoo") - libpry.raises(IOError, proxy.read_chunked, s, None) - - s = cStringIO.StringIO("foo\r\nfoo") - libpry.raises(proxy.ProxyError, proxy.read_chunked, s, None) - +import tutils class Dummy: pass -class u_read_http_body(libpry.AutoTree): - def test_all(self): +def test_read_chunked(): + s = cStringIO.StringIO("1\r\na\r\n0\r\n") + tutils.raises(IOError, proxy.read_chunked, s, None) - d = Dummy() - h = flow.ODict() - s = cStringIO.StringIO("testing") - assert proxy.read_http_body(s, d, h, False, None) == "" + s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n") + assert proxy.read_chunked(s, None) == "a" - h["content-length"] = ["foo"] - s = cStringIO.StringIO("testing") - libpry.raises(proxy.ProxyError, proxy.read_http_body, s, d, h, False, None) + s = cStringIO.StringIO("\r\n") + tutils.raises(IOError, proxy.read_chunked, s, None) - h["content-length"] = [5] - s = cStringIO.StringIO("testing") - assert len(proxy.read_http_body(s, d, h, False, None)) == 5 - s = cStringIO.StringIO("testing") - libpry.raises(proxy.ProxyError, proxy.read_http_body, s, d, h, False, 4) + s = cStringIO.StringIO("1\r\nfoo") + tutils.raises(IOError, proxy.read_chunked, s, None) - h = flow.ODict() - s = cStringIO.StringIO("testing") - assert len(proxy.read_http_body(s, d, h, True, 4)) == 4 - s = cStringIO.StringIO("testing") - assert len(proxy.read_http_body(s, d, h, True, 100)) == 7 + s = cStringIO.StringIO("foo\r\nfoo") + tutils.raises(proxy.ProxyError, proxy.read_chunked, s, None) -class u_parse_request_line(libpry.AutoTree): +def test_read_http_body(): + d = Dummy() + h = flow.ODict() + s = cStringIO.StringIO("testing") + assert proxy.read_http_body(s, d, h, False, None) == "" + + h["content-length"] = ["foo"] + s = cStringIO.StringIO("testing") + tutils.raises(proxy.ProxyError, proxy.read_http_body, s, d, h, False, None) + + h["content-length"] = [5] + s = cStringIO.StringIO("testing") + assert len(proxy.read_http_body(s, d, h, False, None)) == 5 + s = cStringIO.StringIO("testing") + tutils.raises(proxy.ProxyError, proxy.read_http_body, s, d, h, False, 4) + + h = flow.ODict() + s = cStringIO.StringIO("testing") + assert len(proxy.read_http_body(s, d, h, True, 4)) == 4 + s = cStringIO.StringIO("testing") + assert len(proxy.read_http_body(s, d, h, True, 100)) == 7 + + +class TestParseRequestLine: def test_simple(self): - libpry.raises(proxy.ProxyError, proxy.parse_request_line, "") + tutils.raises(proxy.ProxyError, proxy.parse_request_line, "") u = "GET ... HTTP/1.1" - libpry.raises("invalid url", proxy.parse_request_line, u) + tutils.raises("invalid url", proxy.parse_request_line, u) u = "GET http://foo.com:8888/test HTTP/1.1" m, s, h, po, pa, minor = proxy.parse_request_line(u) @@ -77,7 +74,7 @@ class u_parse_request_line(libpry.AutoTree): assert proxy.parse_request_line(u) == ('GET', None, None, None, '/', 1) -class uFileLike(libpry.AutoTree): +class TestFileLike: def test_wrap(self): s = cStringIO.StringIO("foobar\nfoobar") s = proxy.FileLike(s) @@ -88,14 +85,13 @@ class uFileLike(libpry.AutoTree): assert s.isatty - -class uProxyError(libpry.AutoTree): +class TestProxyError: def test_simple(self): p = proxy.ProxyError(111, "msg") assert repr(p) -class u_read_headers(libpry.AutoTree): +class TestReadHeaders: def test_read_simple(self): data = """ Header: one @@ -135,63 +131,45 @@ class u_read_headers(libpry.AutoTree): assert headers["header"] == ['one\r\n two'] -class u_parse_http_protocol(libpry.AutoTree): - def test_simple(self): - assert proxy.parse_http_protocol("HTTP/1.1") == (1, 1) - assert proxy.parse_http_protocol("HTTP/0.0") == (0, 0) - assert not proxy.parse_http_protocol("foo/0.0") +def test_parse_http_protocol(): + assert proxy.parse_http_protocol("HTTP/1.1") == (1, 1) + assert proxy.parse_http_protocol("HTTP/0.0") == (0, 0) + assert not proxy.parse_http_protocol("foo/0.0") -class u_parse_init_connect(libpry.AutoTree): - def test_simple(self): - assert proxy.parse_init_connect("CONNECT host.com:443 HTTP/1.0") - assert not proxy.parse_init_connect("bogus") - assert not proxy.parse_init_connect("GET host.com:443 HTTP/1.0") - assert not proxy.parse_init_connect("CONNECT host.com443 HTTP/1.0") - assert not proxy.parse_init_connect("CONNECT host.com:443 foo/1.0") +def test_parse_init_connect(): + assert proxy.parse_init_connect("CONNECT host.com:443 HTTP/1.0") + assert not proxy.parse_init_connect("bogus") + assert not proxy.parse_init_connect("GET host.com:443 HTTP/1.0") + assert not proxy.parse_init_connect("CONNECT host.com443 HTTP/1.0") + assert not proxy.parse_init_connect("CONNECT host.com:443 foo/1.0") -class u_parse_init_proxy(libpry.AutoTree): - def test_simple(self): - u = "GET http://foo.com:8888/test HTTP/1.1" - m, s, h, po, pa, major, minor = proxy.parse_init_proxy(u) - assert m == "GET" - assert s == "http" - assert h == "foo.com" - assert po == 8888 - assert pa == "/test" - assert major == 1 - assert minor == 1 +def test_prase_init_proxy(): + u = "GET http://foo.com:8888/test HTTP/1.1" + m, s, h, po, pa, major, minor = proxy.parse_init_proxy(u) + assert m == "GET" + assert s == "http" + assert h == "foo.com" + assert po == 8888 + assert pa == "/test" + assert major == 1 + assert minor == 1 - assert not proxy.parse_init_proxy("invalid") - assert not proxy.parse_init_proxy("GET invalid HTTP/1.1") - assert not proxy.parse_init_proxy("GET http://foo.com:8888/test foo/1.1") + assert not proxy.parse_init_proxy("invalid") + assert not proxy.parse_init_proxy("GET invalid HTTP/1.1") + assert not proxy.parse_init_proxy("GET http://foo.com:8888/test foo/1.1") -class u_parse_init_http(libpry.AutoTree): - def test_simple(self): - u = "GET /test HTTP/1.1" - m, u, major, minor = proxy.parse_init_http(u) - assert m == "GET" - assert u == "/test" - assert major == 1 - assert minor == 1 +def test_parse_init_http(): + u = "GET /test HTTP/1.1" + m, u, major, minor = proxy.parse_init_http(u) + assert m == "GET" + assert u == "/test" + assert major == 1 + assert minor == 1 - assert not proxy.parse_init_http("invalid") - assert not proxy.parse_init_http("GET invalid HTTP/1.1") - assert not proxy.parse_init_http("GET /test foo/1.1") + assert not proxy.parse_init_http("invalid") + assert not proxy.parse_init_http("GET invalid HTTP/1.1") + assert not proxy.parse_init_http("GET /test foo/1.1") - - -tests = [ - u_parse_http_protocol(), - u_parse_init_connect(), - u_parse_init_proxy(), - u_parse_init_http(), - uProxyError(), - uFileLike(), - u_parse_request_line(), - u_read_chunked(), - u_read_http_body(), - u_read_headers() -] diff --git a/test/test_script.py b/test/test_script.py index 94f036d9a..88f32ddf2 100644 --- a/test/test_script.py +++ b/test/test_script.py @@ -1,15 +1,14 @@ import os from libmproxy import script, flow -import libpry import tutils -class uScript(libpry.AutoTree): +class TestScript: def test_simple(self): s = flow.State() fm = flow.FlowMaster(None, s) ctx = flow.ScriptContext(fm) - p = script.Script(os.path.join("scripts", "a.py"), ctx) + p = script.Script(tutils.test_data.path("scripts/a.py"), ctx) p.load() assert "here" in p.ns assert p.run("here") == (True, 1) @@ -26,7 +25,7 @@ class uScript(libpry.AutoTree): def test_duplicate_flow(self): s = flow.State() fm = flow.FlowMaster(None, s) - fm.load_script(os.path.join("scripts", "duplicate_flow.py")) + fm.load_script(tutils.test_data.path("scripts/duplicate_flow.py")) r = tutils.treq() fm.handle_request(r) assert fm.state.flow_count() == 2 @@ -40,32 +39,26 @@ class uScript(libpry.AutoTree): s = script.Script("nonexistent", ctx) - libpry.raises( + tutils.raises( "no such file", s.load ) - s = script.Script("scripts", ctx) - libpry.raises( + s = script.Script(tutils.test_data.path("scripts"), ctx) + tutils.raises( "not a file", s.load ) - s = script.Script("scripts/syntaxerr.py", ctx) - libpry.raises( + s = script.Script(tutils.test_data.path("scripts/syntaxerr.py"), ctx) + tutils.raises( script.ScriptError, s.load ) - s = script.Script("scripts/loaderr.py", ctx) - libpry.raises( + s = script.Script(tutils.test_data.path("scripts/loaderr.py"), ctx) + tutils.raises( script.ScriptError, s.load ) - - -tests = [ - uScript(), -] - diff --git a/test/test_wsgi.py b/test/test_wsgi.py index ab2e26682..1ae81d119 100644 --- a/test/test_wsgi.py +++ b/test/test_wsgi.py @@ -1,5 +1,4 @@ import cStringIO, sys -import libpry from libmproxy import wsgi import tutils @@ -16,7 +15,7 @@ class TestApp: return ['Hello', ' world!\n'] -class uWSGIAdaptor(libpry.AutoTree): +class TestWSGIAdaptor: def test_make_environ(self): w = wsgi.WSGIAdaptor(None, "foo", 80) tr = tutils.treq() @@ -97,7 +96,7 @@ class uWSGIAdaptor(libpry.AutoTree): assert "Internal Server Error" in self._serve(app) -class uAppRegistry(libpry.AutoTree): +class TestAppRegistry: def test_add_get(self): ar = wsgi.AppRegistry() ar.add("foo", "domain", 80) @@ -109,9 +108,3 @@ class uAppRegistry(libpry.AutoTree): r.port = 81 assert not ar.get(r) - - -tests = [ - uWSGIAdaptor(), - uAppRegistry() -] diff --git a/test/tutils.py b/test/tutils.py index 34ef928d6..19dbb1393 100644 --- a/test/tutils.py +++ b/test/tutils.py @@ -1,6 +1,8 @@ -import threading, Queue, unittest +import threading, Queue +import os, shutil,tempfile +from contextlib import contextmanager import libpry -from libmproxy import proxy, flow, controller +from libmproxy import proxy, flow, controller, utils import requests import libpathod.test import random @@ -128,6 +130,19 @@ class ProxTest: return pthread.tmaster.log + +@contextmanager +def tmpdir(*args, **kwargs): + orig_workdir = os.getcwd() + temp_workdir = tempfile.mkdtemp(*args, **kwargs) + os.chdir(temp_workdir) + + yield temp_workdir + + os.chdir(orig_workdir) + shutil.rmtree(temp_workdir) + + def raises(exc, obj, *args, **kwargs): """ Assert that a callable raises a specified exception. @@ -165,3 +180,5 @@ def raises(exc, obj, *args, **kwargs): ) ) raise AssertionError("No exception raised.") + +test_data = utils.Data(__name__)