nuke tutils.tmpdir, use pytest tmpdir

This commit is contained in:
Thomas Kriechbaumer 2017-03-12 22:55:22 +01:00
parent d069ba9da5
commit 1b045d24bc
13 changed files with 252 additions and 324 deletions

View File

@ -1,9 +1,5 @@
from io import BytesIO
import tempfile
import os
import time
import shutil
from contextlib import contextmanager
from io import BytesIO
from mitmproxy.utils import data
from mitmproxy.net import tcp
@ -13,18 +9,6 @@ from mitmproxy.net import http
test_data = data.Data(__name__).push("../../test/")
@contextmanager
def tmpdir(*args, **kwargs):
orig_workdir = os.getcwd()
temp_workdir = tempfile.mkdtemp(*args, **kwargs)
os.chdir(temp_workdir)
yield temp_workdir
os.chdir(orig_workdir)
shutil.rmtree(temp_workdir)
def treader(bytes):
"""
Construct a tcp.Read object from bytes.

View File

@ -1,9 +1,7 @@
import os
import pytest
from unittest import mock
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy import io
from mitmproxy import exceptions
@ -49,14 +47,13 @@ class TestClientPlayback:
cp.tick()
assert cp.current_thread is None
def test_configure(self):
def test_configure(self, tmpdir):
cp = clientplayback.ClientPlayback()
with taddons.context() as tctx:
with tutils.tmpdir() as td:
path = os.path.join(td, "flows")
tdump(path, [tflow.tflow()])
tctx.configure(cp, client_replay=[path])
tctx.configure(cp, client_replay=[])
tctx.configure(cp)
with pytest.raises(exceptions.OptionsError):
tctx.configure(cp, client_replay=["nonexistent"])
path = str(tmpdir.join("flows"))
tdump(path, [tflow.tflow()])
tctx.configure(cp, client_replay=[path])
tctx.configure(cp, client_replay=[])
tctx.configure(cp)
with pytest.raises(exceptions.OptionsError):
tctx.configure(cp, client_replay=["nonexistent"])

View File

@ -1,11 +1,9 @@
import os.path
import pytest
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from .. import tservers
from mitmproxy.addons import replace
from mitmproxy.test import taddons
from mitmproxy.test import tflow
class TestReplace:
@ -71,33 +69,31 @@ class TestUpstreamProxy(tservers.HTTPUpstreamProxyTest):
class TestReplaceFile:
def test_simple(self):
def test_simple(self, tmpdir):
r = replace.ReplaceFile()
with tutils.tmpdir() as td:
rp = os.path.join(td, "replacement")
with open(rp, "w") as f:
f.write("bar")
with taddons.context() as tctx:
tctx.configure(
r,
replacement_files = [
"/~q/foo/" + rp,
"/~s/foo/" + rp,
"/~b nonexistent/nonexistent/nonexistent",
]
)
f = tflow.tflow()
f.request.content = b"foo"
r.request(f)
assert f.request.content == b"bar"
rp = tmpdir.join("replacement")
rp.write("bar")
with taddons.context() as tctx:
tctx.configure(
r,
replacement_files = [
"/~q/foo/" + str(rp),
"/~s/foo/" + str(rp),
"/~b nonexistent/nonexistent/nonexistent",
]
)
f = tflow.tflow()
f.request.content = b"foo"
r.request(f)
assert f.request.content == b"bar"
f = tflow.tflow(resp=True)
f.response.content = b"foo"
r.response(f)
assert f.response.content == b"bar"
f = tflow.tflow(resp=True)
f.response.content = b"foo"
r.response(f)
assert f.response.content == b"bar"
f = tflow.tflow()
f.request.content = b"nonexistent"
assert not tctx.master.event_log
r.request(f)
assert tctx.master.event_log
f = tflow.tflow()
f.request.content = b"nonexistent"
assert not tctx.master.event_log
r.request(f)
assert tctx.master.event_log

View File

@ -68,13 +68,12 @@ class TestParseCommand:
with pytest.raises(ValueError):
script.parse_command(" ")
def test_no_script_file(self):
def test_no_script_file(self, tmpdir):
with pytest.raises(Exception, match="not found"):
script.parse_command("notfound")
with tutils.tmpdir() as dir:
with pytest.raises(Exception, match="Not a file"):
script.parse_command(dir)
with pytest.raises(Exception, match="Not a file"):
script.parse_command(str(tmpdir))
def test_parse_args(self):
with utils.chdir(tutils.test_data.dirname):
@ -128,21 +127,19 @@ class TestScript:
recf = sc.ns.call_log[0]
assert recf[1] == "request"
def test_reload(self):
def test_reload(self, tmpdir):
with taddons.context() as tctx:
with tutils.tmpdir():
with open("foo.py", "w"):
pass
sc = script.Script("foo.py")
tctx.configure(sc)
for _ in range(100):
with open("foo.py", "a") as f:
f.write(".")
sc.tick()
time.sleep(0.1)
if tctx.master.event_log:
return
raise AssertionError("Change event not detected.")
f = tmpdir.join("foo.py")
f.ensure(file=True)
sc = script.Script(str(f))
tctx.configure(sc)
for _ in range(100):
f.write(".")
sc.tick()
time.sleep(0.1)
if tctx.master.event_log:
return
raise AssertionError("Change event not detected.")
def test_exception(self):
with taddons.context() as tctx:

View File

@ -1,10 +1,8 @@
import os
import urllib
import pytest
from mitmproxy.test import tutils
from mitmproxy.test import tflow
from mitmproxy.test import taddons
from mitmproxy.test import tflow
import mitmproxy.test.tutils
from mitmproxy.addons import serverplayback
@ -19,15 +17,14 @@ def tdump(path, flows):
w.add(i)
def test_config():
def test_config(tmpdir):
s = serverplayback.ServerPlayback()
with tutils.tmpdir() as p:
with taddons.context() as tctx:
fpath = os.path.join(p, "flows")
tdump(fpath, [tflow.tflow(resp=True)])
tctx.configure(s, server_replay=[fpath])
with pytest.raises(exceptions.OptionsError):
tctx.configure(s, server_replay=[p])
with taddons.context() as tctx:
fpath = str(tmpdir.join("flows"))
tdump(fpath, [tflow.tflow(resp=True)])
tctx.configure(s, server_replay=[fpath])
with pytest.raises(exceptions.OptionsError):
tctx.configure(s, server_replay=[str(tmpdir)])
def test_tick():

View File

@ -1,9 +1,7 @@
import os.path
import pytest
from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy import io
from mitmproxy import exceptions
@ -11,19 +9,17 @@ from mitmproxy import options
from mitmproxy.addons import streamfile
def test_configure():
def test_configure(tmpdir):
sa = streamfile.StreamFile()
with taddons.context(options=options.Options()) as tctx:
with tutils.tmpdir() as tdir:
p = os.path.join(tdir, "foo")
with pytest.raises(exceptions.OptionsError):
tctx.configure(sa, streamfile=tdir)
with pytest.raises(Exception, match="Invalid filter"):
tctx.configure(sa, streamfile=p, filtstr="~~")
tctx.configure(sa, filtstr="foo")
assert sa.filt
tctx.configure(sa, filtstr=None)
assert not sa.filt
with pytest.raises(exceptions.OptionsError):
tctx.configure(sa, streamfile=str(tmpdir))
with pytest.raises(Exception, match="Invalid filter"):
tctx.configure(sa, streamfile=str(tmpdir.join("foo")), filtstr="~~")
tctx.configure(sa, filtstr="foo")
assert sa.filt
tctx.configure(sa, filtstr=None)
assert not sa.filt
def rd(p):
@ -31,36 +27,34 @@ def rd(p):
return list(x.stream())
def test_tcp():
def test_tcp(tmpdir):
sa = streamfile.StreamFile()
with taddons.context() as tctx:
with tutils.tmpdir() as tdir:
p = os.path.join(tdir, "foo")
tctx.configure(sa, streamfile=p)
p = str(tmpdir.join("foo"))
tctx.configure(sa, streamfile=p)
tt = tflow.ttcpflow()
sa.tcp_start(tt)
sa.tcp_end(tt)
tctx.configure(sa, streamfile=None)
assert rd(p)
tt = tflow.ttcpflow()
sa.tcp_start(tt)
sa.tcp_end(tt)
tctx.configure(sa, streamfile=None)
assert rd(p)
def test_simple():
def test_simple(tmpdir):
sa = streamfile.StreamFile()
with taddons.context() as tctx:
with tutils.tmpdir() as tdir:
p = os.path.join(tdir, "foo")
p = str(tmpdir.join("foo"))
tctx.configure(sa, streamfile=p)
tctx.configure(sa, streamfile=p)
f = tflow.tflow(resp=True)
sa.request(f)
sa.response(f)
tctx.configure(sa, streamfile=None)
assert rd(p)[0].response
f = tflow.tflow(resp=True)
sa.request(f)
sa.response(f)
tctx.configure(sa, streamfile=None)
assert rd(p)[0].response
tctx.configure(sa, streamfile="+" + p)
f = tflow.tflow()
sa.request(f)
tctx.configure(sa, streamfile=None)
assert not rd(p)[1].response
tctx.configure(sa, streamfile="+" + p)
f = tflow.tflow()
sa.request(f)
tctx.configure(sa, streamfile=None)
assert not rd(p)[1].response

View File

@ -11,8 +11,8 @@ from OpenSSL import SSL
from mitmproxy import certs
from mitmproxy.net import tcp
from mitmproxy.test import tutils
from mitmproxy import exceptions
from mitmproxy.test import tutils
from . import tservers
from ...conftest import requires_alpn
@ -783,25 +783,24 @@ class TestSSLKeyLogger(tservers.ServerTestBase):
cipher_list="AES256-SHA"
)
def test_log(self):
def test_log(self, tmpdir):
testval = b"echo!\n"
_logfun = tcp.log_ssl_key
with tutils.tmpdir() as d:
logfile = os.path.join(d, "foo", "bar", "logfile")
tcp.log_ssl_key = tcp.SSLKeyLogger(logfile)
logfile = str(tmpdir.join("foo", "bar", "logfile"))
tcp.log_ssl_key = tcp.SSLKeyLogger(logfile)
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl()
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
c.finish()
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl()
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
c.finish()
tcp.log_ssl_key.close()
with open(logfile, "rb") as f:
assert f.read().count(b"CLIENT_RANDOM") == 2
tcp.log_ssl_key.close()
with open(logfile, "rb") as f:
assert f.read().count(b"CLIENT_RANDOM") == 2
tcp.log_ssl_key = _logfun

View File

@ -34,118 +34,106 @@ from mitmproxy.test import tutils
class TestCertStore:
def test_create_explicit(self):
with tutils.tmpdir() as d:
ca = certs.CertStore.from_store(d, "test")
assert ca.get_cert(b"foo", [])
def test_create_explicit(self, tmpdir):
ca = certs.CertStore.from_store(str(tmpdir), "test")
assert ca.get_cert(b"foo", [])
ca2 = certs.CertStore.from_store(d, "test")
assert ca2.get_cert(b"foo", [])
ca2 = certs.CertStore.from_store(str(tmpdir), "test")
assert ca2.get_cert(b"foo", [])
assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
def test_create_no_common_name(self):
with tutils.tmpdir() as d:
ca = certs.CertStore.from_store(d, "test")
assert ca.get_cert(None, [])[0].cn is None
def test_create_no_common_name(self, tmpdir):
ca = certs.CertStore.from_store(str(tmpdir), "test")
assert ca.get_cert(None, [])[0].cn is None
def test_create_tmp(self):
with tutils.tmpdir() as d:
ca = certs.CertStore.from_store(d, "test")
assert ca.get_cert(b"foo.com", [])
assert ca.get_cert(b"foo.com", [])
assert ca.get_cert(b"*.foo.com", [])
def test_create_tmp(self, tmpdir):
ca = certs.CertStore.from_store(str(tmpdir), "test")
assert ca.get_cert(b"foo.com", [])
assert ca.get_cert(b"foo.com", [])
assert ca.get_cert(b"*.foo.com", [])
r = ca.get_cert(b"*.foo.com", [])
assert r[1] == ca.default_privatekey
r = ca.get_cert(b"*.foo.com", [])
assert r[1] == ca.default_privatekey
def test_sans(self):
with tutils.tmpdir() as d:
ca = certs.CertStore.from_store(d, "test")
c1 = ca.get_cert(b"foo.com", [b"*.bar.com"])
ca.get_cert(b"foo.bar.com", [])
# assert c1 == c2
c3 = ca.get_cert(b"bar.com", [])
assert not c1 == c3
def test_sans(self, tmpdir):
ca = certs.CertStore.from_store(str(tmpdir), "test")
c1 = ca.get_cert(b"foo.com", [b"*.bar.com"])
ca.get_cert(b"foo.bar.com", [])
# assert c1 == c2
c3 = ca.get_cert(b"bar.com", [])
assert not c1 == c3
def test_sans_change(self):
with tutils.tmpdir() as d:
ca = certs.CertStore.from_store(d, "test")
ca.get_cert(b"foo.com", [b"*.bar.com"])
cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"])
assert b"*.baz.com" in cert.altnames
def test_sans_change(self, tmpdir):
ca = certs.CertStore.from_store(str(tmpdir), "test")
ca.get_cert(b"foo.com", [b"*.bar.com"])
cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"])
assert b"*.baz.com" in cert.altnames
def test_expire(self):
with tutils.tmpdir() as d:
ca = certs.CertStore.from_store(d, "test")
ca.STORE_CAP = 3
ca.get_cert(b"one.com", [])
ca.get_cert(b"two.com", [])
ca.get_cert(b"three.com", [])
def test_expire(self, tmpdir):
ca = certs.CertStore.from_store(str(tmpdir), "test")
ca.STORE_CAP = 3
ca.get_cert(b"one.com", [])
ca.get_cert(b"two.com", [])
ca.get_cert(b"three.com", [])
assert (b"one.com", ()) in ca.certs
assert (b"two.com", ()) in ca.certs
assert (b"three.com", ()) in ca.certs
assert (b"one.com", ()) in ca.certs
assert (b"two.com", ()) in ca.certs
assert (b"three.com", ()) in ca.certs
ca.get_cert(b"one.com", [])
ca.get_cert(b"one.com", [])
assert (b"one.com", ()) in ca.certs
assert (b"two.com", ()) in ca.certs
assert (b"three.com", ()) in ca.certs
assert (b"one.com", ()) in ca.certs
assert (b"two.com", ()) in ca.certs
assert (b"three.com", ()) in ca.certs
ca.get_cert(b"four.com", [])
ca.get_cert(b"four.com", [])
assert (b"one.com", ()) not in ca.certs
assert (b"two.com", ()) in ca.certs
assert (b"three.com", ()) in ca.certs
assert (b"four.com", ()) in ca.certs
assert (b"one.com", ()) not in ca.certs
assert (b"two.com", ()) in ca.certs
assert (b"three.com", ()) in ca.certs
assert (b"four.com", ()) in ca.certs
def test_overrides(self):
with tutils.tmpdir() as d:
ca1 = certs.CertStore.from_store(os.path.join(d, "ca1"), "test")
ca2 = certs.CertStore.from_store(os.path.join(d, "ca2"), "test")
assert not ca1.default_ca.get_serial_number(
) == ca2.default_ca.get_serial_number()
def test_overrides(self, tmpdir):
ca1 = certs.CertStore.from_store(str(tmpdir.join("ca1")), "test")
ca2 = certs.CertStore.from_store(str(tmpdir.join("ca2")), "test")
assert not ca1.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
dc = ca2.get_cert(b"foo.com", [b"sans.example.com"])
dcp = os.path.join(d, "dc")
f = open(dcp, "wb")
f.write(dc[0].to_pem())
f.close()
ca1.add_cert_file(b"foo.com", dcp)
dc = ca2.get_cert(b"foo.com", [b"sans.example.com"])
dcp = tmpdir.join("dc")
dcp.write(dc[0].to_pem())
ca1.add_cert_file(b"foo.com", str(dcp))
ret = ca1.get_cert(b"foo.com", [])
assert ret[0].serial == dc[0].serial
ret = ca1.get_cert(b"foo.com", [])
assert ret[0].serial == dc[0].serial
def test_create_dhparams(self):
with tutils.tmpdir() as d:
filename = os.path.join(d, "dhparam.pem")
certs.CertStore.load_dhparam(filename)
assert os.path.exists(filename)
def test_create_dhparams(self, tmpdir):
filename = str(tmpdir.join("dhparam.pem"))
certs.CertStore.load_dhparam(filename)
assert os.path.exists(filename)
class TestDummyCert:
def test_with_ca(self):
with tutils.tmpdir() as d:
ca = certs.CertStore.from_store(d, "test")
r = certs.dummy_cert(
ca.default_privatekey,
ca.default_ca,
b"foo.com",
[b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"]
)
assert r.cn == b"foo.com"
assert r.altnames == [b'one.com', b'two.com', b'*.three.com']
def test_with_ca(self, tmpdir):
ca = certs.CertStore.from_store(str(tmpdir), "test")
r = certs.dummy_cert(
ca.default_privatekey,
ca.default_ca,
b"foo.com",
[b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"]
)
assert r.cn == b"foo.com"
assert r.altnames == [b'one.com', b'two.com', b'*.three.com']
r = certs.dummy_cert(
ca.default_privatekey,
ca.default_ca,
None,
[]
)
assert r.cn is None
assert r.altnames == []
r = certs.dummy_cert(
ca.default_privatekey,
ca.default_ca,
None,
[]
)
assert r.cn is None
assert r.altnames == []
class TestSSLCert:

View File

@ -1,5 +1,4 @@
import json
import os
import shlex
import pytest
@ -142,30 +141,26 @@ class TestHARDump:
with pytest.raises(ScriptError):
tscript("complex/har_dump.py")
def test_simple(self):
with tutils.tmpdir() as tdir:
path = os.path.join(tdir, "somefile")
def test_simple(self, tmpdir):
path = str(tmpdir.join("somefile"))
m, sc = tscript("complex/har_dump.py", shlex.quote(path))
m.addons.invoke(m, "response", self.flow())
m.addons.remove(sc)
with open(path, "r") as inp:
har = json.load(inp)
m, sc = tscript("complex/har_dump.py", shlex.quote(path))
m.addons.invoke(m, "response", self.flow())
m.addons.remove(sc)
with open(path, "r") as inp:
har = json.load(inp)
assert len(har["log"]["entries"]) == 1
def test_base64(self):
with tutils.tmpdir() as tdir:
path = os.path.join(tdir, "somefile")
def test_base64(self, tmpdir):
path = str(tmpdir.join("somefile"))
m, sc = tscript("complex/har_dump.py", shlex.quote(path))
m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10))
m.addons.remove(sc)
with open(path, "r") as inp:
har = json.load(inp)
m, sc = tscript("complex/har_dump.py", shlex.quote(path))
m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10))
m.addons.remove(sc)
with open(path, "r") as inp:
har = json.load(inp)
assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64"
def test_format_cookies(self):
@ -187,7 +182,7 @@ class TestHARDump:
f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0]
assert f['expires']
def test_binary(self):
def test_binary(self, tmpdir):
f = self.flow()
f.request.method = "POST"
@ -196,14 +191,12 @@ class TestHARDump:
f.response.headers["random-junk"] = bytes(range(256))
f.response.content = bytes(range(256))
with tutils.tmpdir() as tdir:
path = os.path.join(tdir, "somefile")
path = str(tmpdir.join("somefile"))
m, sc = tscript("complex/har_dump.py", shlex.quote(path))
m.addons.invoke(m, "response", f)
m.addons.remove(sc)
with open(path, "r") as inp:
har = json.load(inp)
m, sc = tscript("complex/har_dump.py", shlex.quote(path))
m.addons.invoke(m, "response", f)
m.addons.remove(sc)
with open(path, "r") as inp:
har = json.load(inp)
assert len(har["log"]["entries"]) == 1

View File

@ -1,5 +1,4 @@
import copy
import os
import pytest
import typing
import argparse
@ -7,7 +6,6 @@ import argparse
from mitmproxy import options
from mitmproxy import optmanager
from mitmproxy import exceptions
from mitmproxy.test import tutils
class TO(optmanager.OptManager):
@ -238,25 +236,24 @@ def test_serialize_defaults():
assert o.serialize(None, defaults=True)
def test_saving():
def test_saving(tmpdir):
o = TD2()
o.three = "set"
with tutils.tmpdir() as tdir:
dst = os.path.join(tdir, "conf")
o.save(dst, defaults=True)
dst = str(tmpdir.join("conf"))
o.save(dst, defaults=True)
o2 = TD2()
o2.load_paths(dst)
o2.three = "foo"
o2.save(dst, defaults=True)
o2 = TD2()
o2.load_paths(dst)
o2.three = "foo"
o2.save(dst, defaults=True)
o.load_paths(dst)
assert o.three == "foo"
with open(dst, 'a') as f:
f.write("foobar: '123'")
with pytest.raises(exceptions.OptionsError, matches=''):
o.load_paths(dst)
assert o.three == "foo"
with open(dst, 'a') as f:
f.write("foobar: '123'")
with pytest.raises(exceptions.OptionsError, matches=''):
o.load_paths(dst)
def test_merge():

View File

@ -1,4 +1,3 @@
import os
import pytest
from unittest import mock
@ -9,7 +8,6 @@ from mitmproxy import controller
from mitmproxy import options
from mitmproxy.tools import dump
from mitmproxy.test import tutils
from .. import tservers
@ -19,18 +17,17 @@ class TestDumpMaster(tservers.MasterTest):
m = dump.DumpMaster(o, proxy.DummyServer(), with_termlog=False, with_dumper=False)
return m
def test_read(self):
with tutils.tmpdir() as t:
p = os.path.join(t, "read")
self.flowfile(p)
self.dummy_cycle(
self.mkmaster(None, rfile=p),
1, b"",
)
with pytest.raises(exceptions.OptionsError):
self.mkmaster(None, rfile="/nonexistent")
with pytest.raises(exceptions.OptionsError):
self.mkmaster(None, rfile="test_dump.py")
def test_read(self, tmpdir):
p = str(tmpdir.join("read"))
self.flowfile(p)
self.dummy_cycle(
self.mkmaster(None, rfile=p),
1, b"",
)
with pytest.raises(exceptions.OptionsError):
self.mkmaster(None, rfile="/nonexistent")
with pytest.raises(exceptions.OptionsError):
self.mkmaster(None, rfile="test_dump.py")
def test_has_error(self):
m = self.mkmaster(None)

View File

@ -1,11 +1,8 @@
import os
import pytest
from pathod import language
from pathod.language import base, exceptions
from mitmproxy.test import tutils
def parse_request(s):
return language.parse_pathoc(s).next()
@ -137,24 +134,22 @@ class TestTokValueFile:
v = base.TokValue.parseString("<path")[0]
assert v.path == "path"
def test_access_control(self):
def test_access_control(self, tmpdir):
v = base.TokValue.parseString("<path")[0]
with tutils.tmpdir() as t:
p = os.path.join(t, "path")
with open(p, "wb") as f:
f.write(b"x" * 10000)
f = tmpdir.join("path")
f.write(b"x" * 10000)
assert v.get_generator(language.Settings(staticdir=t))
assert v.get_generator(language.Settings(staticdir=str(tmpdir)))
v = base.TokValue.parseString("<path2")[0]
with pytest.raises(exceptions.FileAccessDenied):
v.get_generator(language.Settings(staticdir=t))
with pytest.raises(Exception, match="access disabled"):
v.get_generator(language.Settings())
v = base.TokValue.parseString("<path2")[0]
with pytest.raises(exceptions.FileAccessDenied):
v.get_generator(language.Settings(staticdir=str(tmpdir)))
with pytest.raises(Exception, match="access disabled"):
v.get_generator(language.Settings())
v = base.TokValue.parseString("</outside")[0]
with pytest.raises(Exception, match="outside"):
v.get_generator(language.Settings(staticdir=t))
v = base.TokValue.parseString("</outside")[0]
with pytest.raises(Exception, match="outside"):
v.get_generator(language.Settings(staticdir=str(tmpdir)))
def test_spec(self):
v = base.TokValue.parseString("<'one two'")[0]

View File

@ -1,7 +1,4 @@
import os
from pathod.language import generators
from mitmproxy.test import tutils
def test_randomgenerator():
@ -15,23 +12,20 @@ def test_randomgenerator():
assert len(g[1000:1001]) == 0
def test_filegenerator():
with tutils.tmpdir() as t:
path = os.path.join(t, "foo")
f = open(path, "wb")
f.write(b"x" * 10000)
f.close()
g = generators.FileGenerator(path)
assert len(g) == 10000
assert g[0] == b"x"
assert g[-1] == b"x"
assert g[0:5] == b"xxxxx"
assert len(g[1:10]) == 9
assert len(g[10000:10001]) == 0
assert repr(g)
# remove all references to FileGenerator instance to close the file
# handle.
del g
def test_filegenerator(tmpdir):
f = tmpdir.join("foo")
f.write(b"x" * 10000)
g = generators.FileGenerator(str(f))
assert len(g) == 10000
assert g[0] == b"x"
assert g[-1] == b"x"
assert g[0:5] == b"xxxxx"
assert len(g[1:10]) == 9
assert len(g[10000:10001]) == 0
assert repr(g)
# remove all references to FileGenerator instance to close the file
# handle.
del g
def test_transform_generator():