From 1b045d24bccd68f6db1e15c655af192cb5217a6a Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Sun, 12 Mar 2017 22:55:22 +0100 Subject: [PATCH] nuke tutils.tmpdir, use pytest tmpdir --- mitmproxy/test/tutils.py | 18 +- test/mitmproxy/addons/test_clientplayback.py | 19 +-- test/mitmproxy/addons/test_replace.py | 56 +++---- test/mitmproxy/addons/test_script.py | 33 ++-- test/mitmproxy/addons/test_serverplayback.py | 19 +-- test/mitmproxy/addons/test_streamfile.py | 68 ++++---- test/mitmproxy/net/test_tcp.py | 29 ++-- test/mitmproxy/test_certs.py | 168 +++++++++---------- test/mitmproxy/test_examples.py | 49 +++--- test/mitmproxy/test_optmanager.py | 29 ++-- test/mitmproxy/tools/test_dump.py | 25 ++- test/pathod/language/test_base.py | 29 ++-- test/pathod/language/test_generators.py | 34 ++-- 13 files changed, 252 insertions(+), 324 deletions(-) diff --git a/mitmproxy/test/tutils.py b/mitmproxy/test/tutils.py index 7b3114927..80e5b6fd7 100644 --- a/mitmproxy/test/tutils.py +++ b/mitmproxy/test/tutils.py @@ -1,9 +1,5 @@ -from io import BytesIO -import tempfile -import os import time -import shutil -from contextlib import contextmanager +from io import BytesIO from mitmproxy.utils import data from mitmproxy.net import tcp @@ -13,18 +9,6 @@ from mitmproxy.net import http test_data = data.Data(__name__).push("../../test/") -@contextmanager -def tmpdir(*args, **kwargs): - orig_workdir = os.getcwd() - temp_workdir = tempfile.mkdtemp(*args, **kwargs) - os.chdir(temp_workdir) - - yield temp_workdir - - os.chdir(orig_workdir) - shutil.rmtree(temp_workdir) - - def treader(bytes): """ Construct a tcp.Read object from bytes. diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py index 6b8b7c907..c22b35890 100644 --- a/test/mitmproxy/addons/test_clientplayback.py +++ b/test/mitmproxy/addons/test_clientplayback.py @@ -1,9 +1,7 @@ -import os import pytest from unittest import mock from mitmproxy.test import tflow -from mitmproxy.test import tutils from mitmproxy import io from mitmproxy import exceptions @@ -49,14 +47,13 @@ class TestClientPlayback: cp.tick() assert cp.current_thread is None - def test_configure(self): + def test_configure(self, tmpdir): cp = clientplayback.ClientPlayback() with taddons.context() as tctx: - with tutils.tmpdir() as td: - path = os.path.join(td, "flows") - tdump(path, [tflow.tflow()]) - tctx.configure(cp, client_replay=[path]) - tctx.configure(cp, client_replay=[]) - tctx.configure(cp) - with pytest.raises(exceptions.OptionsError): - tctx.configure(cp, client_replay=["nonexistent"]) + path = str(tmpdir.join("flows")) + tdump(path, [tflow.tflow()]) + tctx.configure(cp, client_replay=[path]) + tctx.configure(cp, client_replay=[]) + tctx.configure(cp) + with pytest.raises(exceptions.OptionsError): + tctx.configure(cp, client_replay=["nonexistent"]) diff --git a/test/mitmproxy/addons/test_replace.py b/test/mitmproxy/addons/test_replace.py index 8c280c51c..2311641a4 100644 --- a/test/mitmproxy/addons/test_replace.py +++ b/test/mitmproxy/addons/test_replace.py @@ -1,11 +1,9 @@ -import os.path import pytest -from mitmproxy.test import tflow -from mitmproxy.test import tutils from .. import tservers from mitmproxy.addons import replace from mitmproxy.test import taddons +from mitmproxy.test import tflow class TestReplace: @@ -71,33 +69,31 @@ class TestUpstreamProxy(tservers.HTTPUpstreamProxyTest): class TestReplaceFile: - def test_simple(self): + def test_simple(self, tmpdir): r = replace.ReplaceFile() - with tutils.tmpdir() as td: - rp = os.path.join(td, "replacement") - with open(rp, "w") as f: - f.write("bar") - with taddons.context() as tctx: - tctx.configure( - r, - replacement_files = [ - "/~q/foo/" + rp, - "/~s/foo/" + rp, - "/~b nonexistent/nonexistent/nonexistent", - ] - ) - f = tflow.tflow() - f.request.content = b"foo" - r.request(f) - assert f.request.content == b"bar" + rp = tmpdir.join("replacement") + rp.write("bar") + with taddons.context() as tctx: + tctx.configure( + r, + replacement_files = [ + "/~q/foo/" + str(rp), + "/~s/foo/" + str(rp), + "/~b nonexistent/nonexistent/nonexistent", + ] + ) + f = tflow.tflow() + f.request.content = b"foo" + r.request(f) + assert f.request.content == b"bar" - f = tflow.tflow(resp=True) - f.response.content = b"foo" - r.response(f) - assert f.response.content == b"bar" + f = tflow.tflow(resp=True) + f.response.content = b"foo" + r.response(f) + assert f.response.content == b"bar" - f = tflow.tflow() - f.request.content = b"nonexistent" - assert not tctx.master.event_log - r.request(f) - assert tctx.master.event_log + f = tflow.tflow() + f.request.content = b"nonexistent" + assert not tctx.master.event_log + r.request(f) + assert tctx.master.event_log diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py index 5f196ebf6..d79ed4ef8 100644 --- a/test/mitmproxy/addons/test_script.py +++ b/test/mitmproxy/addons/test_script.py @@ -68,13 +68,12 @@ class TestParseCommand: with pytest.raises(ValueError): script.parse_command(" ") - def test_no_script_file(self): + def test_no_script_file(self, tmpdir): with pytest.raises(Exception, match="not found"): script.parse_command("notfound") - with tutils.tmpdir() as dir: - with pytest.raises(Exception, match="Not a file"): - script.parse_command(dir) + with pytest.raises(Exception, match="Not a file"): + script.parse_command(str(tmpdir)) def test_parse_args(self): with utils.chdir(tutils.test_data.dirname): @@ -128,21 +127,19 @@ class TestScript: recf = sc.ns.call_log[0] assert recf[1] == "request" - def test_reload(self): + def test_reload(self, tmpdir): with taddons.context() as tctx: - with tutils.tmpdir(): - with open("foo.py", "w"): - pass - sc = script.Script("foo.py") - tctx.configure(sc) - for _ in range(100): - with open("foo.py", "a") as f: - f.write(".") - sc.tick() - time.sleep(0.1) - if tctx.master.event_log: - return - raise AssertionError("Change event not detected.") + f = tmpdir.join("foo.py") + f.ensure(file=True) + sc = script.Script(str(f)) + tctx.configure(sc) + for _ in range(100): + f.write(".") + sc.tick() + time.sleep(0.1) + if tctx.master.event_log: + return + raise AssertionError("Change event not detected.") def test_exception(self): with taddons.context() as tctx: diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py index e2afa5166..54e4d281c 100644 --- a/test/mitmproxy/addons/test_serverplayback.py +++ b/test/mitmproxy/addons/test_serverplayback.py @@ -1,10 +1,8 @@ -import os import urllib import pytest -from mitmproxy.test import tutils -from mitmproxy.test import tflow from mitmproxy.test import taddons +from mitmproxy.test import tflow import mitmproxy.test.tutils from mitmproxy.addons import serverplayback @@ -19,15 +17,14 @@ def tdump(path, flows): w.add(i) -def test_config(): +def test_config(tmpdir): s = serverplayback.ServerPlayback() - with tutils.tmpdir() as p: - with taddons.context() as tctx: - fpath = os.path.join(p, "flows") - tdump(fpath, [tflow.tflow(resp=True)]) - tctx.configure(s, server_replay=[fpath]) - with pytest.raises(exceptions.OptionsError): - tctx.configure(s, server_replay=[p]) + with taddons.context() as tctx: + fpath = str(tmpdir.join("flows")) + tdump(fpath, [tflow.tflow(resp=True)]) + tctx.configure(s, server_replay=[fpath]) + with pytest.raises(exceptions.OptionsError): + tctx.configure(s, server_replay=[str(tmpdir)]) def test_tick(): diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py index 4105c1fc9..3f78521ce 100644 --- a/test/mitmproxy/addons/test_streamfile.py +++ b/test/mitmproxy/addons/test_streamfile.py @@ -1,9 +1,7 @@ -import os.path import pytest -from mitmproxy.test import tflow -from mitmproxy.test import tutils from mitmproxy.test import taddons +from mitmproxy.test import tflow from mitmproxy import io from mitmproxy import exceptions @@ -11,19 +9,17 @@ from mitmproxy import options from mitmproxy.addons import streamfile -def test_configure(): +def test_configure(tmpdir): sa = streamfile.StreamFile() with taddons.context(options=options.Options()) as tctx: - with tutils.tmpdir() as tdir: - p = os.path.join(tdir, "foo") - with pytest.raises(exceptions.OptionsError): - tctx.configure(sa, streamfile=tdir) - with pytest.raises(Exception, match="Invalid filter"): - tctx.configure(sa, streamfile=p, filtstr="~~") - tctx.configure(sa, filtstr="foo") - assert sa.filt - tctx.configure(sa, filtstr=None) - assert not sa.filt + with pytest.raises(exceptions.OptionsError): + tctx.configure(sa, streamfile=str(tmpdir)) + with pytest.raises(Exception, match="Invalid filter"): + tctx.configure(sa, streamfile=str(tmpdir.join("foo")), filtstr="~~") + tctx.configure(sa, filtstr="foo") + assert sa.filt + tctx.configure(sa, filtstr=None) + assert not sa.filt def rd(p): @@ -31,36 +27,34 @@ def rd(p): return list(x.stream()) -def test_tcp(): +def test_tcp(tmpdir): sa = streamfile.StreamFile() with taddons.context() as tctx: - with tutils.tmpdir() as tdir: - p = os.path.join(tdir, "foo") - tctx.configure(sa, streamfile=p) + p = str(tmpdir.join("foo")) + tctx.configure(sa, streamfile=p) - tt = tflow.ttcpflow() - sa.tcp_start(tt) - sa.tcp_end(tt) - tctx.configure(sa, streamfile=None) - assert rd(p) + tt = tflow.ttcpflow() + sa.tcp_start(tt) + sa.tcp_end(tt) + tctx.configure(sa, streamfile=None) + assert rd(p) -def test_simple(): +def test_simple(tmpdir): sa = streamfile.StreamFile() with taddons.context() as tctx: - with tutils.tmpdir() as tdir: - p = os.path.join(tdir, "foo") + p = str(tmpdir.join("foo")) - tctx.configure(sa, streamfile=p) + tctx.configure(sa, streamfile=p) - f = tflow.tflow(resp=True) - sa.request(f) - sa.response(f) - tctx.configure(sa, streamfile=None) - assert rd(p)[0].response + f = tflow.tflow(resp=True) + sa.request(f) + sa.response(f) + tctx.configure(sa, streamfile=None) + assert rd(p)[0].response - tctx.configure(sa, streamfile="+" + p) - f = tflow.tflow() - sa.request(f) - tctx.configure(sa, streamfile=None) - assert not rd(p)[1].response + tctx.configure(sa, streamfile="+" + p) + f = tflow.tflow() + sa.request(f) + tctx.configure(sa, streamfile=None) + assert not rd(p)[1].response diff --git a/test/mitmproxy/net/test_tcp.py b/test/mitmproxy/net/test_tcp.py index cf010f6e2..8b26784a5 100644 --- a/test/mitmproxy/net/test_tcp.py +++ b/test/mitmproxy/net/test_tcp.py @@ -11,8 +11,8 @@ from OpenSSL import SSL from mitmproxy import certs from mitmproxy.net import tcp -from mitmproxy.test import tutils from mitmproxy import exceptions +from mitmproxy.test import tutils from . import tservers from ...conftest import requires_alpn @@ -783,25 +783,24 @@ class TestSSLKeyLogger(tservers.ServerTestBase): cipher_list="AES256-SHA" ) - def test_log(self): + def test_log(self, tmpdir): testval = b"echo!\n" _logfun = tcp.log_ssl_key - with tutils.tmpdir() as d: - logfile = os.path.join(d, "foo", "bar", "logfile") - tcp.log_ssl_key = tcp.SSLKeyLogger(logfile) + logfile = str(tmpdir.join("foo", "bar", "logfile")) + tcp.log_ssl_key = tcp.SSLKeyLogger(logfile) - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl() - c.wfile.write(testval) - c.wfile.flush() - assert c.rfile.readline() == testval - c.finish() + c = tcp.TCPClient(("127.0.0.1", self.port)) + with c.connect(): + c.convert_to_ssl() + c.wfile.write(testval) + c.wfile.flush() + assert c.rfile.readline() == testval + c.finish() - tcp.log_ssl_key.close() - with open(logfile, "rb") as f: - assert f.read().count(b"CLIENT_RANDOM") == 2 + tcp.log_ssl_key.close() + with open(logfile, "rb") as f: + assert f.read().count(b"CLIENT_RANDOM") == 2 tcp.log_ssl_key = _logfun diff --git a/test/mitmproxy/test_certs.py b/test/mitmproxy/test_certs.py index 9bd3ad25d..2d12c3702 100644 --- a/test/mitmproxy/test_certs.py +++ b/test/mitmproxy/test_certs.py @@ -34,118 +34,106 @@ from mitmproxy.test import tutils class TestCertStore: - def test_create_explicit(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - assert ca.get_cert(b"foo", []) + def test_create_explicit(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + assert ca.get_cert(b"foo", []) - ca2 = certs.CertStore.from_store(d, "test") - assert ca2.get_cert(b"foo", []) + ca2 = certs.CertStore.from_store(str(tmpdir), "test") + assert ca2.get_cert(b"foo", []) - assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() + assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() - def test_create_no_common_name(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - assert ca.get_cert(None, [])[0].cn is None + def test_create_no_common_name(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + assert ca.get_cert(None, [])[0].cn is None - def test_create_tmp(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - assert ca.get_cert(b"foo.com", []) - assert ca.get_cert(b"foo.com", []) - assert ca.get_cert(b"*.foo.com", []) + def test_create_tmp(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + assert ca.get_cert(b"foo.com", []) + assert ca.get_cert(b"foo.com", []) + assert ca.get_cert(b"*.foo.com", []) - r = ca.get_cert(b"*.foo.com", []) - assert r[1] == ca.default_privatekey + r = ca.get_cert(b"*.foo.com", []) + assert r[1] == ca.default_privatekey - def test_sans(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - c1 = ca.get_cert(b"foo.com", [b"*.bar.com"]) - ca.get_cert(b"foo.bar.com", []) - # assert c1 == c2 - c3 = ca.get_cert(b"bar.com", []) - assert not c1 == c3 + def test_sans(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + c1 = ca.get_cert(b"foo.com", [b"*.bar.com"]) + ca.get_cert(b"foo.bar.com", []) + # assert c1 == c2 + c3 = ca.get_cert(b"bar.com", []) + assert not c1 == c3 - def test_sans_change(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - ca.get_cert(b"foo.com", [b"*.bar.com"]) - cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"]) - assert b"*.baz.com" in cert.altnames + def test_sans_change(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + ca.get_cert(b"foo.com", [b"*.bar.com"]) + cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"]) + assert b"*.baz.com" in cert.altnames - def test_expire(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - ca.STORE_CAP = 3 - ca.get_cert(b"one.com", []) - ca.get_cert(b"two.com", []) - ca.get_cert(b"three.com", []) + def test_expire(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + ca.STORE_CAP = 3 + ca.get_cert(b"one.com", []) + ca.get_cert(b"two.com", []) + ca.get_cert(b"three.com", []) - assert (b"one.com", ()) in ca.certs - assert (b"two.com", ()) in ca.certs - assert (b"three.com", ()) in ca.certs + assert (b"one.com", ()) in ca.certs + assert (b"two.com", ()) in ca.certs + assert (b"three.com", ()) in ca.certs - ca.get_cert(b"one.com", []) + ca.get_cert(b"one.com", []) - assert (b"one.com", ()) in ca.certs - assert (b"two.com", ()) in ca.certs - assert (b"three.com", ()) in ca.certs + assert (b"one.com", ()) in ca.certs + assert (b"two.com", ()) in ca.certs + assert (b"three.com", ()) in ca.certs - ca.get_cert(b"four.com", []) + ca.get_cert(b"four.com", []) - assert (b"one.com", ()) not in ca.certs - assert (b"two.com", ()) in ca.certs - assert (b"three.com", ()) in ca.certs - assert (b"four.com", ()) in ca.certs + assert (b"one.com", ()) not in ca.certs + assert (b"two.com", ()) in ca.certs + assert (b"three.com", ()) in ca.certs + assert (b"four.com", ()) in ca.certs - def test_overrides(self): - with tutils.tmpdir() as d: - ca1 = certs.CertStore.from_store(os.path.join(d, "ca1"), "test") - ca2 = certs.CertStore.from_store(os.path.join(d, "ca2"), "test") - assert not ca1.default_ca.get_serial_number( - ) == ca2.default_ca.get_serial_number() + def test_overrides(self, tmpdir): + ca1 = certs.CertStore.from_store(str(tmpdir.join("ca1")), "test") + ca2 = certs.CertStore.from_store(str(tmpdir.join("ca2")), "test") + assert not ca1.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() - dc = ca2.get_cert(b"foo.com", [b"sans.example.com"]) - dcp = os.path.join(d, "dc") - f = open(dcp, "wb") - f.write(dc[0].to_pem()) - f.close() - ca1.add_cert_file(b"foo.com", dcp) + dc = ca2.get_cert(b"foo.com", [b"sans.example.com"]) + dcp = tmpdir.join("dc") + dcp.write(dc[0].to_pem()) + ca1.add_cert_file(b"foo.com", str(dcp)) - ret = ca1.get_cert(b"foo.com", []) - assert ret[0].serial == dc[0].serial + ret = ca1.get_cert(b"foo.com", []) + assert ret[0].serial == dc[0].serial - def test_create_dhparams(self): - with tutils.tmpdir() as d: - filename = os.path.join(d, "dhparam.pem") - certs.CertStore.load_dhparam(filename) - assert os.path.exists(filename) + def test_create_dhparams(self, tmpdir): + filename = str(tmpdir.join("dhparam.pem")) + certs.CertStore.load_dhparam(filename) + assert os.path.exists(filename) class TestDummyCert: - def test_with_ca(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - r = certs.dummy_cert( - ca.default_privatekey, - ca.default_ca, - b"foo.com", - [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"] - ) - assert r.cn == b"foo.com" - assert r.altnames == [b'one.com', b'two.com', b'*.three.com'] + def test_with_ca(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + r = certs.dummy_cert( + ca.default_privatekey, + ca.default_ca, + b"foo.com", + [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"] + ) + assert r.cn == b"foo.com" + assert r.altnames == [b'one.com', b'two.com', b'*.three.com'] - r = certs.dummy_cert( - ca.default_privatekey, - ca.default_ca, - None, - [] - ) - assert r.cn is None - assert r.altnames == [] + r = certs.dummy_cert( + ca.default_privatekey, + ca.default_ca, + None, + [] + ) + assert r.cn is None + assert r.altnames == [] class TestSSLCert: diff --git a/test/mitmproxy/test_examples.py b/test/mitmproxy/test_examples.py index 668d0d4a2..f20e0c8cd 100644 --- a/test/mitmproxy/test_examples.py +++ b/test/mitmproxy/test_examples.py @@ -1,5 +1,4 @@ import json -import os import shlex import pytest @@ -142,30 +141,26 @@ class TestHARDump: with pytest.raises(ScriptError): tscript("complex/har_dump.py") - def test_simple(self): - with tutils.tmpdir() as tdir: - path = os.path.join(tdir, "somefile") + def test_simple(self, tmpdir): + path = str(tmpdir.join("somefile")) - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.invoke(m, "response", self.flow()) - m.addons.remove(sc) - - with open(path, "r") as inp: - har = json.load(inp) + m, sc = tscript("complex/har_dump.py", shlex.quote(path)) + m.addons.invoke(m, "response", self.flow()) + m.addons.remove(sc) + with open(path, "r") as inp: + har = json.load(inp) assert len(har["log"]["entries"]) == 1 - def test_base64(self): - with tutils.tmpdir() as tdir: - path = os.path.join(tdir, "somefile") + def test_base64(self, tmpdir): + path = str(tmpdir.join("somefile")) - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10)) - m.addons.remove(sc) - - with open(path, "r") as inp: - har = json.load(inp) + m, sc = tscript("complex/har_dump.py", shlex.quote(path)) + m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10)) + m.addons.remove(sc) + with open(path, "r") as inp: + har = json.load(inp) assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64" def test_format_cookies(self): @@ -187,7 +182,7 @@ class TestHARDump: f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0] assert f['expires'] - def test_binary(self): + def test_binary(self, tmpdir): f = self.flow() f.request.method = "POST" @@ -196,14 +191,12 @@ class TestHARDump: f.response.headers["random-junk"] = bytes(range(256)) f.response.content = bytes(range(256)) - with tutils.tmpdir() as tdir: - path = os.path.join(tdir, "somefile") + path = str(tmpdir.join("somefile")) - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.invoke(m, "response", f) - m.addons.remove(sc) - - with open(path, "r") as inp: - har = json.load(inp) + m, sc = tscript("complex/har_dump.py", shlex.quote(path)) + m.addons.invoke(m, "response", f) + m.addons.remove(sc) + with open(path, "r") as inp: + har = json.load(inp) assert len(har["log"]["entries"]) == 1 diff --git a/test/mitmproxy/test_optmanager.py b/test/mitmproxy/test_optmanager.py index db33cddd2..0ecfc4e5c 100644 --- a/test/mitmproxy/test_optmanager.py +++ b/test/mitmproxy/test_optmanager.py @@ -1,5 +1,4 @@ import copy -import os import pytest import typing import argparse @@ -7,7 +6,6 @@ import argparse from mitmproxy import options from mitmproxy import optmanager from mitmproxy import exceptions -from mitmproxy.test import tutils class TO(optmanager.OptManager): @@ -238,25 +236,24 @@ def test_serialize_defaults(): assert o.serialize(None, defaults=True) -def test_saving(): +def test_saving(tmpdir): o = TD2() o.three = "set" - with tutils.tmpdir() as tdir: - dst = os.path.join(tdir, "conf") - o.save(dst, defaults=True) + dst = str(tmpdir.join("conf")) + o.save(dst, defaults=True) - o2 = TD2() - o2.load_paths(dst) - o2.three = "foo" - o2.save(dst, defaults=True) + o2 = TD2() + o2.load_paths(dst) + o2.three = "foo" + o2.save(dst, defaults=True) + o.load_paths(dst) + assert o.three == "foo" + + with open(dst, 'a') as f: + f.write("foobar: '123'") + with pytest.raises(exceptions.OptionsError, matches=''): o.load_paths(dst) - assert o.three == "foo" - - with open(dst, 'a') as f: - f.write("foobar: '123'") - with pytest.raises(exceptions.OptionsError, matches=''): - o.load_paths(dst) def test_merge(): diff --git a/test/mitmproxy/tools/test_dump.py b/test/mitmproxy/tools/test_dump.py index 2542ec4b0..a15bf5830 100644 --- a/test/mitmproxy/tools/test_dump.py +++ b/test/mitmproxy/tools/test_dump.py @@ -1,4 +1,3 @@ -import os import pytest from unittest import mock @@ -9,7 +8,6 @@ from mitmproxy import controller from mitmproxy import options from mitmproxy.tools import dump -from mitmproxy.test import tutils from .. import tservers @@ -19,18 +17,17 @@ class TestDumpMaster(tservers.MasterTest): m = dump.DumpMaster(o, proxy.DummyServer(), with_termlog=False, with_dumper=False) return m - def test_read(self): - with tutils.tmpdir() as t: - p = os.path.join(t, "read") - self.flowfile(p) - self.dummy_cycle( - self.mkmaster(None, rfile=p), - 1, b"", - ) - with pytest.raises(exceptions.OptionsError): - self.mkmaster(None, rfile="/nonexistent") - with pytest.raises(exceptions.OptionsError): - self.mkmaster(None, rfile="test_dump.py") + def test_read(self, tmpdir): + p = str(tmpdir.join("read")) + self.flowfile(p) + self.dummy_cycle( + self.mkmaster(None, rfile=p), + 1, b"", + ) + with pytest.raises(exceptions.OptionsError): + self.mkmaster(None, rfile="/nonexistent") + with pytest.raises(exceptions.OptionsError): + self.mkmaster(None, rfile="test_dump.py") def test_has_error(self): m = self.mkmaster(None) diff --git a/test/pathod/language/test_base.py b/test/pathod/language/test_base.py index 85e9e53b5..ec460b079 100644 --- a/test/pathod/language/test_base.py +++ b/test/pathod/language/test_base.py @@ -1,11 +1,8 @@ -import os import pytest from pathod import language from pathod.language import base, exceptions -from mitmproxy.test import tutils - def parse_request(s): return language.parse_pathoc(s).next() @@ -137,24 +134,22 @@ class TestTokValueFile: v = base.TokValue.parseString("