test: shift test_data out of our public API

This commit is contained in:
Aldo Cortesi 2018-04-23 11:05:58 +12:00 committed by Aldo Cortesi
parent 0ba10b6109
commit 95e690ba31
33 changed files with 217 additions and 230 deletions

View File

@ -1,13 +1,9 @@
from io import BytesIO from io import BytesIO
from mitmproxy.utils import data
from mitmproxy.net import tcp from mitmproxy.net import tcp
from mitmproxy.net import http from mitmproxy.net import http
test_data = data.Data(__name__).push("../../test/")
def treader(bytes): def treader(bytes):
""" """
Construct a tcp.Read object from bytes. Construct a tcp.Read object from bytes.

View File

@ -1,6 +1,8 @@
import os import os
import socket import socket
from mitmproxy.utils import data
import pytest import pytest
pytest_plugins = ('test.full_coverage_plugin',) pytest_plugins = ('test.full_coverage_plugin',)
@ -33,3 +35,8 @@ skip_no_ipv6 = pytest.mark.skipif(
no_ipv6, no_ipv6,
reason='Host has no IPv6 support' reason='Host has no IPv6 support'
) )
@pytest.fixture()
def tdata():
return data.Data(__name__)

View File

@ -6,27 +6,25 @@ from mitmproxy.net.http import Headers
from ..mitmproxy import tservers from ..mitmproxy import tservers
example_dir = tutils.test_data.push("../examples")
class TestScripts(tservers.MasterTest): class TestScripts(tservers.MasterTest):
def test_add_header(self): def test_add_header(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
a = tctx.script(example_dir.path("simple/add_header.py")) a = tctx.script(tdata.path("../examples/simple/add_header.py"))
f = tflow.tflow(resp=tutils.tresp()) f = tflow.tflow(resp=tutils.tresp())
a.response(f) a.response(f)
assert f.response.headers["newheader"] == "foo" assert f.response.headers["newheader"] == "foo"
def test_custom_contentviews(self): def test_custom_contentviews(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
tctx.script(example_dir.path("simple/custom_contentview.py")) tctx.script(tdata.path("../examples/simple/custom_contentview.py"))
swapcase = contentviews.get("swapcase") swapcase = contentviews.get("swapcase")
_, fmt = swapcase(b"<html>Test!</html>") _, fmt = swapcase(b"<html>Test!</html>")
assert any(b'tEST!' in val[0][1] for val in fmt) assert any(b'tEST!' in val[0][1] for val in fmt)
def test_iframe_injector(self): def test_iframe_injector(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
sc = tctx.script(example_dir.path("simple/modify_body_inject_iframe.py")) sc = tctx.script(tdata.path("../examples/simple/modify_body_inject_iframe.py"))
tctx.configure( tctx.configure(
sc, sc,
iframe = "http://example.org/evil_iframe" iframe = "http://example.org/evil_iframe"
@ -38,9 +36,9 @@ class TestScripts(tservers.MasterTest):
content = f.response.content content = f.response.content
assert b'iframe' in content and b'evil_iframe' in content assert b'iframe' in content and b'evil_iframe' in content
def test_modify_form(self): def test_modify_form(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
sc = tctx.script(example_dir.path("simple/modify_form.py")) sc = tctx.script(tdata.path("../examples/simple/modify_form.py"))
form_header = Headers(content_type="application/x-www-form-urlencoded") form_header = Headers(content_type="application/x-www-form-urlencoded")
f = tflow.tflow(req=tutils.treq(headers=form_header)) f = tflow.tflow(req=tutils.treq(headers=form_header))
@ -52,9 +50,9 @@ class TestScripts(tservers.MasterTest):
sc.request(f) sc.request(f)
assert list(f.request.urlencoded_form.items()) == [("foo", "bar")] assert list(f.request.urlencoded_form.items()) == [("foo", "bar")]
def test_modify_querystring(self): def test_modify_querystring(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
sc = tctx.script(example_dir.path("simple/modify_querystring.py")) sc = tctx.script(tdata.path("../examples/simple/modify_querystring.py"))
f = tflow.tflow(req=tutils.treq(path="/search?q=term")) f = tflow.tflow(req=tutils.treq(path="/search?q=term"))
sc.request(f) sc.request(f)
@ -64,23 +62,23 @@ class TestScripts(tservers.MasterTest):
sc.request(f) sc.request(f)
assert f.request.query["mitmproxy"] == "rocks" assert f.request.query["mitmproxy"] == "rocks"
def test_redirect_requests(self): def test_redirect_requests(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
sc = tctx.script(example_dir.path("simple/redirect_requests.py")) sc = tctx.script(tdata.path("../examples/simple/redirect_requests.py"))
f = tflow.tflow(req=tutils.treq(host="example.org")) f = tflow.tflow(req=tutils.treq(host="example.org"))
sc.request(f) sc.request(f)
assert f.request.host == "mitmproxy.org" assert f.request.host == "mitmproxy.org"
def test_send_reply_from_proxy(self): def test_send_reply_from_proxy(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
sc = tctx.script(example_dir.path("simple/send_reply_from_proxy.py")) sc = tctx.script(tdata.path("../examples/simple/send_reply_from_proxy.py"))
f = tflow.tflow(req=tutils.treq(host="example.com", port=80)) f = tflow.tflow(req=tutils.treq(host="example.com", port=80))
sc.request(f) sc.request(f)
assert f.response.content == b"Hello World" assert f.response.content == b"Hello World"
def test_dns_spoofing(self): def test_dns_spoofing(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
sc = tctx.script(example_dir.path("complex/dns_spoofing.py")) sc = tctx.script(tdata.path("../examples/complex/dns_spoofing.py"))
original_host = "example.com" original_host = "example.com"

View File

@ -5,8 +5,6 @@ from mitmproxy.test import tutils
from mitmproxy.test import taddons from mitmproxy.test import taddons
from mitmproxy.net.http import cookies from mitmproxy.net.http import cookies
example_dir = tutils.test_data.push("../examples")
class TestHARDump: class TestHARDump:
def flow(self, resp_content=b'message'): def flow(self, resp_content=b'message'):
@ -21,9 +19,9 @@ class TestHARDump:
resp=tutils.tresp(content=resp_content, **times) resp=tutils.tresp(content=resp_content, **times)
) )
def test_simple(self, tmpdir): def test_simple(self, tmpdir, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
a = tctx.script(example_dir.path("complex/har_dump.py")) a = tctx.script(tdata.path("../examples/complex/har_dump.py"))
path = str(tmpdir.join("somefile")) path = str(tmpdir.join("somefile"))
tctx.configure(a, hardump=path) tctx.configure(a, hardump=path)
tctx.invoke(a, "response", self.flow()) tctx.invoke(a, "response", self.flow())
@ -32,9 +30,9 @@ class TestHARDump:
har = json.load(inp) har = json.load(inp)
assert len(har["log"]["entries"]) == 1 assert len(har["log"]["entries"]) == 1
def test_base64(self, tmpdir): def test_base64(self, tmpdir, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
a = tctx.script(example_dir.path("complex/har_dump.py")) a = tctx.script(tdata.path("../examples/complex/har_dump.py"))
path = str(tmpdir.join("somefile")) path = str(tmpdir.join("somefile"))
tctx.configure(a, hardump=path) tctx.configure(a, hardump=path)
@ -46,9 +44,9 @@ class TestHARDump:
har = json.load(inp) har = json.load(inp)
assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64" assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64"
def test_format_cookies(self): def test_format_cookies(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
a = tctx.script(example_dir.path("complex/har_dump.py")) a = tctx.script(tdata.path("../examples/complex/har_dump.py"))
CA = cookies.CookieAttrs CA = cookies.CookieAttrs
@ -65,9 +63,9 @@ class TestHARDump:
f = a.format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0] f = a.format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0]
assert f['expires'] assert f['expires']
def test_binary(self, tmpdir): def test_binary(self, tmpdir, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
a = tctx.script(example_dir.path("complex/har_dump.py")) a = tctx.script(tdata.path("../examples/complex/har_dump.py"))
path = str(tmpdir.join("somefile")) path = str(tmpdir.join("somefile"))
tctx.configure(a, hardump=path) tctx.configure(a, hardump=path)

View File

@ -3,7 +3,6 @@ from unittest import mock
from mitmproxy.addons import core from mitmproxy.addons import core
from mitmproxy.test import taddons from mitmproxy.test import taddons
from mitmproxy.test import tflow from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy import exceptions from mitmproxy import exceptions
import pytest import pytest
@ -198,13 +197,13 @@ def test_validation_modes(m):
tctx.configure(sa, mode = "reverse:") tctx.configure(sa, mode = "reverse:")
def test_client_certs(): def test_client_certs(tdata):
sa = core.Core() sa = core.Core()
with taddons.context() as tctx: with taddons.context() as tctx:
# Folders should work. # Folders should work.
tctx.configure(sa, client_certs = tutils.test_data.path("mitmproxy/data/clientcert")) tctx.configure(sa, client_certs = tdata.path("mitmproxy/data/clientcert"))
# Files, too. # Files, too.
tctx.configure(sa, client_certs = tutils.test_data.path("mitmproxy/data/clientcert/client.pem")) tctx.configure(sa, client_certs = tdata.path("mitmproxy/data/clientcert/client.pem"))
with pytest.raises(exceptions.OptionsError, match="certificate path does not exist"): with pytest.raises(exceptions.OptionsError, match="certificate path does not exist"):
tctx.configure(sa, client_certs = "invalid") tctx.configure(sa, client_certs = "invalid")

View File

@ -5,13 +5,12 @@ from mitmproxy import exceptions
from mitmproxy import certs from mitmproxy import certs
from mitmproxy.test import taddons from mitmproxy.test import taddons
from mitmproxy.test import tflow from mitmproxy.test import tflow
from mitmproxy.test import tutils
import pytest import pytest
import pyperclip import pyperclip
from unittest import mock from unittest import mock
def test_extract(): def test_extract(tdata):
tf = tflow.tflow(resp=True) tf = tflow.tflow(resp=True)
tests = [ tests = [
["request.method", "GET"], ["request.method", "GET"],
@ -54,7 +53,7 @@ def test_extract():
ret = cut.extract(spec, tf) ret = cut.extract(spec, tf)
assert spec and ret == expected assert spec and ret == expected
with open(tutils.test_data.path("mitmproxy/net/data/text_cert"), "rb") as f: with open(tdata.path("mitmproxy/net/data/text_cert"), "rb") as f:
d = f.read() d = f.read()
c1 = certs.Cert.from_pem(d) c1 = certs.Cert.from_pem(d)
tf.server_conn.cert = c1 tf.server_conn.cert = c1

View File

@ -7,7 +7,6 @@ from mitmproxy import exceptions
from mitmproxy.addons import proxyauth from mitmproxy.addons import proxyauth
from mitmproxy.test import taddons from mitmproxy.test import taddons
from mitmproxy.test import tflow from mitmproxy.test import tflow
from mitmproxy.test import tutils
class TestMkauth: class TestMkauth:
@ -73,7 +72,7 @@ class TestProxyAuth:
assert resp.status_code == expected_status_code assert resp.status_code == expected_status_code
assert expected_header in resp.headers.keys() assert expected_header in resp.headers.keys()
def test_check(self): def test_check(self, tdata):
up = proxyauth.ProxyAuth() up = proxyauth.ProxyAuth()
with taddons.context(up) as ctx: with taddons.context(up) as ctx:
ctx.configure(up, proxyauth="any", mode="regular") ctx.configure(up, proxyauth="any", mode="regular")
@ -102,7 +101,7 @@ class TestProxyAuth:
ctx.configure( ctx.configure(
up, up,
proxyauth="@" + tutils.test_data.path( proxyauth="@" + tdata.path(
"mitmproxy/net/data/htpasswd" "mitmproxy/net/data/htpasswd"
) )
) )
@ -163,7 +162,7 @@ class TestProxyAuth:
assert not f.response assert not f.response
assert not f.request.headers.get("Authorization") assert not f.request.headers.get("Authorization")
def test_configure(self): def test_configure(self, tdata):
up = proxyauth.ProxyAuth() up = proxyauth.ProxyAuth()
with taddons.context(up) as ctx: with taddons.context(up) as ctx:
with pytest.raises(exceptions.OptionsError): with pytest.raises(exceptions.OptionsError):
@ -199,14 +198,14 @@ class TestProxyAuth:
with pytest.raises(exceptions.OptionsError): with pytest.raises(exceptions.OptionsError):
ctx.configure( ctx.configure(
up, up,
proxyauth= "@" + tutils.test_data.path("mitmproxy/net/data/server.crt") proxyauth= "@" + tdata.path("mitmproxy/net/data/server.crt")
) )
with pytest.raises(exceptions.OptionsError): with pytest.raises(exceptions.OptionsError):
ctx.configure(up, proxyauth="@nonexistent") ctx.configure(up, proxyauth="@nonexistent")
ctx.configure( ctx.configure(
up, up,
proxyauth= "@" + tutils.test_data.path( proxyauth= "@" + tdata.path(
"mitmproxy/net/data/htpasswd" "mitmproxy/net/data/htpasswd"
) )
) )

View File

@ -10,7 +10,6 @@ from mitmproxy import exceptions
from mitmproxy.addons import script from mitmproxy.addons import script
from mitmproxy.test import taddons from mitmproxy.test import taddons
from mitmproxy.test import tflow from mitmproxy.test import tflow
from mitmproxy.test import tutils
# We want this to be speedy for testing # We want this to be speedy for testing
@ -18,10 +17,10 @@ script.ReloadInterval = 0.1
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_load_script(): async def test_load_script(tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
ns = script.load_script( ns = script.load_script(
tutils.test_data.path( tdata.path(
"mitmproxy/data/addonscripts/recorder/recorder.py" "mitmproxy/data/addonscripts/recorder/recorder.py"
) )
) )
@ -33,27 +32,27 @@ async def test_load_script():
assert await tctx.master.await_log("No such file or directory") assert await tctx.master.await_log("No such file or directory")
script.load_script( script.load_script(
tutils.test_data.path( tdata.path(
"mitmproxy/data/addonscripts/recorder/error.py" "mitmproxy/data/addonscripts/recorder/error.py"
) )
) )
assert await tctx.master.await_log("invalid syntax") assert await tctx.master.await_log("invalid syntax")
def test_load_fullname(): def test_load_fullname(tdata):
""" """
Test that loading two scripts at locations a/foo.py and b/foo.py works. Test that loading two scripts at locations a/foo.py and b/foo.py works.
This only succeeds if they get assigned different basenames. This only succeeds if they get assigned different basenames.
""" """
ns = script.load_script( ns = script.load_script(
tutils.test_data.path( tdata.path(
"mitmproxy/data/addonscripts/addon.py" "mitmproxy/data/addonscripts/addon.py"
) )
) )
assert ns.addons assert ns.addons
ns2 = script.load_script( ns2 = script.load_script(
tutils.test_data.path( tdata.path(
"mitmproxy/data/addonscripts/same_filename/addon.py" "mitmproxy/data/addonscripts/same_filename/addon.py"
) )
) )
@ -62,11 +61,11 @@ def test_load_fullname():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_script_print_stdout(): async def test_script_print_stdout(tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
with addonmanager.safecall(): with addonmanager.safecall():
ns = script.load_script( ns = script.load_script(
tutils.test_data.path("mitmproxy/data/addonscripts/print.py") tdata.path("mitmproxy/data/addonscripts/print.py")
) )
ns.load(addonmanager.Loader(tctx.master)) ns.load(addonmanager.Loader(tctx.master))
assert await tctx.master.await_log("stdoutprint") assert await tctx.master.await_log("stdoutprint")
@ -78,12 +77,12 @@ class TestScript:
with pytest.raises(exceptions.OptionsError): with pytest.raises(exceptions.OptionsError):
script.Script("nonexistent", False) script.Script("nonexistent", False)
def test_quotes_around_filename(self): def test_quotes_around_filename(self, tdata):
""" """
Test that a script specified as '"foo.py"' works to support the calling convention of Test that a script specified as '"foo.py"' works to support the calling convention of
mitmproxy 2.0, as e.g. used by Cuckoo Sandbox. mitmproxy 2.0, as e.g. used by Cuckoo Sandbox.
""" """
path = tutils.test_data.path("mitmproxy/data/addonscripts/recorder/recorder.py") path = tdata.path("mitmproxy/data/addonscripts/recorder/recorder.py")
s = script.Script( s = script.Script(
'"{}"'.format(path), '"{}"'.format(path),
@ -92,10 +91,10 @@ class TestScript:
assert '"' not in s.fullpath assert '"' not in s.fullpath
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simple(self): async def test_simple(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
sc = script.Script( sc = script.Script(
tutils.test_data.path( tdata.path(
"mitmproxy/data/addonscripts/recorder/recorder.py" "mitmproxy/data/addonscripts/recorder/recorder.py"
), ),
True, True,
@ -133,10 +132,10 @@ class TestScript:
raise AssertionError("No reload seen") raise AssertionError("No reload seen")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_exception(self): async def test_exception(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
sc = script.Script( sc = script.Script(
tutils.test_data.path("mitmproxy/data/addonscripts/error.py"), tdata.path("mitmproxy/data/addonscripts/error.py"),
True, True,
) )
tctx.master.addons.add(sc) tctx.master.addons.add(sc)
@ -150,10 +149,10 @@ class TestScript:
assert await tctx.master.await_log("error.py") assert await tctx.master.await_log("error.py")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_addon(self): async def test_addon(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
sc = script.Script( sc = script.Script(
tutils.test_data.path( tdata.path(
"mitmproxy/data/addonscripts/addon.py" "mitmproxy/data/addonscripts/addon.py"
), ),
True True
@ -185,8 +184,8 @@ class TestCutTraceback:
class TestScriptLoader: class TestScriptLoader:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_script_run(self): async def test_script_run(self, tdata):
rp = tutils.test_data.path( rp = tdata.path(
"mitmproxy/data/addonscripts/recorder/recorder.py" "mitmproxy/data/addonscripts/recorder/recorder.py"
) )
sc = script.ScriptLoader() sc = script.ScriptLoader()
@ -207,7 +206,7 @@ class TestScriptLoader:
sc.script_run([tflow.tflow(resp=True)], "/") sc.script_run([tflow.tflow(resp=True)], "/")
assert await tctx.master.await_log("/: No such script") assert await tctx.master.await_log("/: No such script")
def test_simple(self): def test_simple(self, tdata):
sc = script.ScriptLoader() sc = script.ScriptLoader()
with taddons.context(loadcore=False) as tctx: with taddons.context(loadcore=False) as tctx:
tctx.master.addons.add(sc) tctx.master.addons.add(sc)
@ -215,7 +214,7 @@ class TestScriptLoader:
assert len(tctx.master.addons) == 1 assert len(tctx.master.addons) == 1
tctx.master.options.update( tctx.master.options.update(
scripts = [ scripts = [
tutils.test_data.path( tdata.path(
"mitmproxy/data/addonscripts/recorder/recorder.py" "mitmproxy/data/addonscripts/recorder/recorder.py"
) )
] ]
@ -236,28 +235,28 @@ class TestScriptLoader:
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_script_deletion(self): async def test_script_deletion(self, tdata):
tdir = tutils.test_data.path("mitmproxy/data/addonscripts/") tdir = tdata.path("mitmproxy/data/addonscripts/")
with open(tdir + "/dummy.py", 'w') as f: with open(tdir + "/dummy.py", 'w') as f:
f.write("\n") f.write("\n")
with taddons.context() as tctx: with taddons.context() as tctx:
sl = script.ScriptLoader() sl = script.ScriptLoader()
tctx.master.addons.add(sl) tctx.master.addons.add(sl)
tctx.configure(sl, scripts=[tutils.test_data.path("mitmproxy/data/addonscripts/dummy.py")]) tctx.configure(sl, scripts=[tdata.path("mitmproxy/data/addonscripts/dummy.py")])
await tctx.master.await_log("Loading") await tctx.master.await_log("Loading")
os.remove(tutils.test_data.path("mitmproxy/data/addonscripts/dummy.py")) os.remove(tdata.path("mitmproxy/data/addonscripts/dummy.py"))
await tctx.master.await_log("Removing") await tctx.master.await_log("Removing")
assert not tctx.options.scripts assert not tctx.options.scripts
assert not sl.addons assert not sl.addons
def test_load_err(self): def test_load_err(self, tdata):
sc = script.ScriptLoader() sc = script.ScriptLoader()
with taddons.context(sc, loadcore=False) as tctx: with taddons.context(sc, loadcore=False) as tctx:
tctx.configure(sc, scripts=[ tctx.configure(sc, scripts=[
tutils.test_data.path("mitmproxy/data/addonscripts/load_error.py") tdata.path("mitmproxy/data/addonscripts/load_error.py")
]) ])
try: try:
tctx.invoke(sc, "tick") tctx.invoke(sc, "tick")
@ -281,8 +280,8 @@ class TestScriptLoader:
assert await tctx.master.await_log("NoneType") assert await tctx.master.await_log("NoneType")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_order(self): async def test_order(self, tdata):
rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder") rec = tdata.path("mitmproxy/data/addonscripts/recorder")
sc = script.ScriptLoader() sc = script.ScriptLoader()
sc.is_running = True sc.is_running = True
with taddons.context() as tctx: with taddons.context() as tctx:

View File

@ -1,7 +1,6 @@
import pytest import pytest
from mitmproxy.contentviews.image import image_parser from mitmproxy.contentviews.image import image_parser
from mitmproxy.test import tutils
@pytest.mark.parametrize("filename, metadata", { @pytest.mark.parametrize("filename, metadata", {
@ -71,8 +70,8 @@ from mitmproxy.test import tutils
('date:modify', '2012-07-11T14:04:52-07:00') ('date:modify', '2012-07-11T14:04:52-07:00')
], ],
}.items()) }.items())
def test_parse_png(filename, metadata): def test_parse_png(filename, metadata, tdata):
with open(tutils.test_data.path(filename), "rb") as f: with open(tdata.path(filename), "rb") as f:
assert metadata == image_parser.parse_png(f.read()) assert metadata == image_parser.parse_png(f.read())
@ -101,8 +100,8 @@ def test_parse_png(filename, metadata):
('background', '0') ('background', '0')
], ],
}.items()) }.items())
def test_parse_gif(filename, metadata): def test_parse_gif(filename, metadata, tdata):
with open(tutils.test_data.path(filename), 'rb') as f: with open(tdata.path(filename), 'rb') as f:
assert metadata == image_parser.parse_gif(f.read()) assert metadata == image_parser.parse_gif(f.read())
@ -164,8 +163,8 @@ def test_parse_gif(filename, metadata):
('Size', '750 x 1055 px') ('Size', '750 x 1055 px')
], ],
}.items()) }.items())
def test_parse_jpeg(filename, metadata): def test_parse_jpeg(filename, metadata, tdata):
with open(tutils.test_data.path(filename), 'rb') as f: with open(tdata.path(filename), 'rb') as f:
assert metadata == image_parser.parse_jpeg(f.read()) assert metadata == image_parser.parse_jpeg(f.read())
@ -187,6 +186,6 @@ def test_parse_jpeg(filename, metadata):
) )
] ]
}.items()) }.items())
def test_ico(filename, metadata): def test_ico(filename, metadata, tdata):
with open(tutils.test_data.path(filename), 'rb') as f: with open(tdata.path(filename), 'rb') as f:
assert metadata == image_parser.parse_ico(f.read()) assert metadata == image_parser.parse_ico(f.read())

View File

@ -1,9 +1,8 @@
from mitmproxy.contentviews import image from mitmproxy.contentviews import image
from mitmproxy.test import tutils
from .. import full_eval from .. import full_eval
def test_view_image(): def test_view_image(tdata):
v = full_eval(image.ViewImage()) v = full_eval(image.ViewImage())
for img in [ for img in [
"mitmproxy/data/image.png", "mitmproxy/data/image.png",
@ -11,7 +10,7 @@ def test_view_image():
"mitmproxy/data/all.jpeg", "mitmproxy/data/all.jpeg",
"mitmproxy/data/image.ico", "mitmproxy/data/image.ico",
]: ]:
with open(tutils.test_data.path(img), "rb") as f: with open(tdata.path(img), "rb") as f:
viewname, lines = v(f.read()) viewname, lines = v(f.read())
assert img.split(".")[-1].upper() in viewname assert img.split(".")[-1].upper() in viewname

View File

@ -1,11 +1,8 @@
import pytest import pytest
from mitmproxy.contentviews import css from mitmproxy.contentviews import css
from mitmproxy.test import tutils
from . import full_eval from . import full_eval
data = tutils.test_data.push("mitmproxy/contentviews/test_css_data/")
@pytest.mark.parametrize("filename", [ @pytest.mark.parametrize("filename", [
"animation-keyframe.css", "animation-keyframe.css",
@ -19,8 +16,8 @@ data = tutils.test_data.push("mitmproxy/contentviews/test_css_data/")
"selectors.css", "selectors.css",
"simple.css", "simple.css",
]) ])
def test_beautify(filename): def test_beautify(filename, tdata):
path = data.path(filename) path = tdata.path("mitmproxy/contentviews/test_css_data/" + filename)
with open(path) as f: with open(path) as f:
input = f.read() input = f.read()
with open("-formatted.".join(path.rsplit(".", 1))) as f: with open("-formatted.".join(path.rsplit(".", 1))) as f:

View File

@ -1,11 +1,8 @@
import pytest import pytest
from mitmproxy.contentviews import javascript from mitmproxy.contentviews import javascript
from mitmproxy.test import tutils
from . import full_eval from . import full_eval
data = tutils.test_data.push("mitmproxy/contentviews/test_js_data/")
def test_view_javascript(): def test_view_javascript():
v = full_eval(javascript.ViewJavaScript()) v = full_eval(javascript.ViewJavaScript())
@ -22,8 +19,8 @@ def test_view_javascript():
@pytest.mark.parametrize("filename", [ @pytest.mark.parametrize("filename", [
"simple.js", "simple.js",
]) ])
def test_format_xml(filename): def test_format_xml(filename, tdata):
path = data.path(filename) path = tdata.path("mitmproxy/contentviews/test_js_data/" + filename)
with open(path) as f: with open(path) as f:
input = f.read() input = f.read()
with open("-formatted.".join(path.rsplit(".", 1))) as f: with open("-formatted.".join(path.rsplit(".", 1))) as f:

View File

@ -1,15 +1,14 @@
import pytest import pytest
from mitmproxy.contentviews import protobuf from mitmproxy.contentviews import protobuf
from mitmproxy.test import tutils
from . import full_eval from . import full_eval
data = tutils.test_data.push("mitmproxy/contentviews/test_protobuf_data/") datadir = "mitmproxy/contentviews/test_protobuf_data/"
def test_view_protobuf_request(): def test_view_protobuf_request(tdata):
v = full_eval(protobuf.ViewProtobuf()) v = full_eval(protobuf.ViewProtobuf())
p = data.path("protobuf01") p = tdata.path(datadir + "protobuf01")
with open(p, "rb") as f: with open(p, "rb") as f:
raw = f.read() raw = f.read()
@ -21,8 +20,8 @@ def test_view_protobuf_request():
@pytest.mark.parametrize("filename", ["protobuf02", "protobuf03"]) @pytest.mark.parametrize("filename", ["protobuf02", "protobuf03"])
def test_format_pbuf(filename): def test_format_pbuf(filename, tdata):
path = data.path(filename) path = tdata.path(datadir + filename)
with open(path, "rb") as f: with open(path, "rb") as f:
input = f.read() input = f.read()
with open(path + "-decoded") as f: with open(path + "-decoded") as f:

View File

@ -1,17 +1,16 @@
from mitmproxy.contentviews import wbxml from mitmproxy.contentviews import wbxml
from mitmproxy.test import tutils
from . import full_eval from . import full_eval
data = tutils.test_data.push("mitmproxy/contentviews/test_wbxml_data/") datadir = "mitmproxy/contentviews/test_wbxml_data/"
def test_wbxml(): def test_wbxml(tdata):
v = full_eval(wbxml.ViewWBXML()) v = full_eval(wbxml.ViewWBXML())
assert v(b'\x03\x01\x6A\x00') == ('WBXML', [[('text', '<?xml version="1.0" ?>')]]) assert v(b'\x03\x01\x6A\x00') == ('WBXML', [[('text', '<?xml version="1.0" ?>')]])
assert v(b'foo') is None assert v(b'foo') is None
path = data.path("data.wbxml") # File taken from https://github.com/davidpshaw/PyWBXMLDecoder/tree/master/wbxml_samples path = tdata.path(datadir + "data.wbxml") # File taken from https://github.com/davidpshaw/PyWBXMLDecoder/tree/master/wbxml_samples
with open(path, 'rb') as f: with open(path, 'rb') as f:
input = f.read() input = f.read()
with open("-formatted.".join(path.rsplit(".", 1))) as f: with open("-formatted.".join(path.rsplit(".", 1))) as f:

View File

@ -1,20 +1,19 @@
import pytest import pytest
from mitmproxy.contentviews import xml_html from mitmproxy.contentviews import xml_html
from mitmproxy.test import tutils
from . import full_eval from . import full_eval
data = tutils.test_data.push("mitmproxy/contentviews/test_xml_html_data/") datadir = "mitmproxy/contentviews/test_xml_html_data/"
def test_simple(): def test_simple(tdata):
v = full_eval(xml_html.ViewXmlHtml()) v = full_eval(xml_html.ViewXmlHtml())
assert v(b"foo") == ('XML', [[('text', 'foo')]]) assert v(b"foo") == ('XML', [[('text', 'foo')]])
assert v(b"<html></html>") == ('HTML', [[('text', '<html></html>')]]) assert v(b"<html></html>") == ('HTML', [[('text', '<html></html>')]])
assert v(b"<>") == ('XML', [[('text', '<>')]]) assert v(b"<>") == ('XML', [[('text', '<>')]])
assert v(b"<p") == ('XML', [[('text', '<p')]]) assert v(b"<p") == ('XML', [[('text', '<p')]])
with open(data.path("simple.html")) as f: with open(tdata.path(datadir + "simple.html")) as f:
input = f.read() input = f.read()
tokens = xml_html.tokenize(input) tokens = xml_html.tokenize(input)
assert str(next(tokens)) == "Tag(<!DOCTYPE html>)" assert str(next(tokens)) == "Tag(<!DOCTYPE html>)"
@ -27,8 +26,8 @@ def test_simple():
"inline.html", "inline.html",
"test.html" "test.html"
]) ])
def test_format_xml(filename): def test_format_xml(filename, tdata):
path = data.path(filename) path = tdata.path(datadir + filename)
with open(path) as f: with open(path) as f:
input = f.read() input = f.read()
with open("-formatted.".join(path.rsplit(".", 1))) as f: with open("-formatted.".join(path.rsplit(".", 1))) as f:

View File

@ -2,27 +2,26 @@ import pytest
from mitmproxy import io from mitmproxy import io
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy.test import tutils
def test_load(): def test_load(tdata):
with open(tutils.test_data.path("mitmproxy/data/dumpfile-011"), "rb") as f: with open(tdata.path("mitmproxy/data/dumpfile-011"), "rb") as f:
flow_reader = io.FlowReader(f) flow_reader = io.FlowReader(f)
flows = list(flow_reader.stream()) flows = list(flow_reader.stream())
assert len(flows) == 1 assert len(flows) == 1
assert flows[0].request.url == "https://example.com/" assert flows[0].request.url == "https://example.com/"
def test_load_018(): def test_load_018(tdata):
with open(tutils.test_data.path("mitmproxy/data/dumpfile-018"), "rb") as f: with open(tdata.path("mitmproxy/data/dumpfile-018"), "rb") as f:
flow_reader = io.FlowReader(f) flow_reader = io.FlowReader(f)
flows = list(flow_reader.stream()) flows = list(flow_reader.stream())
assert len(flows) == 1 assert len(flows) == 1
assert flows[0].request.url == "https://www.example.com/" assert flows[0].request.url == "https://www.example.com/"
def test_cannot_convert(): def test_cannot_convert(tdata):
with open(tutils.test_data.path("mitmproxy/data/dumpfile-010"), "rb") as f: with open(tdata.path("mitmproxy/data/dumpfile-010"), "rb") as f:
flow_reader = io.FlowReader(f) flow_reader = io.FlowReader(f)
with pytest.raises(exceptions.FlowReadException): with pytest.raises(exceptions.FlowReadException):
list(flow_reader.stream()) list(flow_reader.stream())

View File

@ -12,12 +12,15 @@ from OpenSSL import SSL
from mitmproxy import certs from mitmproxy import certs
from mitmproxy.net import tcp from mitmproxy.net import tcp
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy.test import tutils from mitmproxy.utils import data
from ...conftest import skip_no_ipv6 from ...conftest import skip_no_ipv6
from . import tservers from . import tservers
cdata = data.Data(__name__)
class EchoHandler(tcp.BaseHandler): class EchoHandler(tcp.BaseHandler):
sni = None sni = None
@ -172,7 +175,7 @@ class TestServerSSL(tservers.ServerTestBase):
handler = EchoHandler handler = EchoHandler
ssl = dict( ssl = dict(
cipher_list="AES256-SHA", cipher_list="AES256-SHA",
chain_file=tutils.test_data.path("mitmproxy/net/data/server.crt") chain_file=cdata.path("data/server.crt")
) )
def test_echo(self): def test_echo(self):
@ -209,14 +212,14 @@ class TestSSLv3Only(tservers.ServerTestBase):
class TestInvalidTrustFile(tservers.ServerTestBase): class TestInvalidTrustFile(tservers.ServerTestBase):
def test_invalid_trust_file_should_fail(self): def test_invalid_trust_file_should_fail(self, tdata):
c = tcp.TCPClient(("127.0.0.1", self.port)) c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect(): with c.connect():
with pytest.raises(exceptions.TlsException): with pytest.raises(exceptions.TlsException):
c.convert_to_tls( c.convert_to_tls(
sni="example.mitmproxy.org", sni="example.mitmproxy.org",
verify=SSL.VERIFY_PEER, verify=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("mitmproxy/net/data/verificationcerts/generate.py") ca_pemfile=tdata.path("mitmproxy/net/data/verificationcerts/generate.py")
) )
@ -224,8 +227,8 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
handler = EchoHandler handler = EchoHandler
ssl = dict( ssl = dict(
cert=tutils.test_data.path("mitmproxy/net/data/verificationcerts/self-signed.crt"), cert=cdata.path("data/verificationcerts/self-signed.crt"),
key=tutils.test_data.path("mitmproxy/net/data/verificationcerts/self-signed.key") key=cdata.path("data/verificationcerts/self-signed.key")
) )
def test_mode_default_should_pass(self): def test_mode_default_should_pass(self):
@ -255,14 +258,14 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
c.wfile.flush() c.wfile.flush()
assert c.rfile.readline() == testval assert c.rfile.readline() == testval
def test_mode_strict_should_fail(self): def test_mode_strict_should_fail(self, tdata):
c = tcp.TCPClient(("127.0.0.1", self.port)) c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect(): with c.connect():
with pytest.raises(exceptions.InvalidCertificateException): with pytest.raises(exceptions.InvalidCertificateException):
c.convert_to_tls( c.convert_to_tls(
sni="example.mitmproxy.org", sni="example.mitmproxy.org",
verify=SSL.VERIFY_PEER, verify=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-root.crt") ca_pemfile=tdata.path("mitmproxy/net/data/verificationcerts/trusted-root.crt")
) )
assert c.ssl_verification_error assert c.ssl_verification_error
@ -276,37 +279,37 @@ class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase):
handler = EchoHandler handler = EchoHandler
ssl = dict( ssl = dict(
cert=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-leaf.crt"), cert=cdata.path("data/verificationcerts/trusted-leaf.crt"),
key=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-leaf.key") key=cdata.path("data/verificationcerts/trusted-leaf.key")
) )
def test_should_fail_without_sni(self): def test_should_fail_without_sni(self, tdata):
c = tcp.TCPClient(("127.0.0.1", self.port)) c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect(): with c.connect():
with pytest.raises(exceptions.TlsException): with pytest.raises(exceptions.TlsException):
c.convert_to_tls( c.convert_to_tls(
verify=SSL.VERIFY_PEER, verify=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-root.crt") ca_pemfile=tdata.path("mitmproxy/net/data/verificationcerts/trusted-root.crt")
) )
def test_mode_none_should_pass_without_sni(self): def test_mode_none_should_pass_without_sni(self, tdata):
c = tcp.TCPClient(("127.0.0.1", self.port)) c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect(): with c.connect():
c.convert_to_tls( c.convert_to_tls(
verify=SSL.VERIFY_NONE, verify=SSL.VERIFY_NONE,
ca_path=tutils.test_data.path("mitmproxy/net/data/verificationcerts/") ca_path=tdata.path("mitmproxy/net/data/verificationcerts/")
) )
assert "'no-hostname' doesn't match" in str(c.ssl_verification_error) assert "'no-hostname' doesn't match" in str(c.ssl_verification_error)
def test_should_fail(self): def test_should_fail(self, tdata):
c = tcp.TCPClient(("127.0.0.1", self.port)) c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect(): with c.connect():
with pytest.raises(exceptions.InvalidCertificateException): with pytest.raises(exceptions.InvalidCertificateException):
c.convert_to_tls( c.convert_to_tls(
sni="mitmproxy.org", sni="mitmproxy.org",
verify=SSL.VERIFY_PEER, verify=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-root.crt") ca_pemfile=tdata.path("mitmproxy/net/data/verificationcerts/trusted-root.crt")
) )
assert c.ssl_verification_error assert c.ssl_verification_error
@ -315,17 +318,17 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase):
handler = EchoHandler handler = EchoHandler
ssl = dict( ssl = dict(
cert=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-leaf.crt"), cert=cdata.path("data/verificationcerts/trusted-leaf.crt"),
key=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-leaf.key") key=cdata.path("data/verificationcerts/trusted-leaf.key")
) )
def test_mode_strict_w_pemfile_should_pass(self): def test_mode_strict_w_pemfile_should_pass(self, tdata):
c = tcp.TCPClient(("127.0.0.1", self.port)) c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect(): with c.connect():
c.convert_to_tls( c.convert_to_tls(
sni="example.mitmproxy.org", sni="example.mitmproxy.org",
verify=SSL.VERIFY_PEER, verify=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-root.crt") ca_pemfile=tdata.path("mitmproxy/net/data/verificationcerts/trusted-root.crt")
) )
assert c.ssl_verification_error is None assert c.ssl_verification_error is None
@ -335,13 +338,13 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase):
c.wfile.flush() c.wfile.flush()
assert c.rfile.readline() == testval assert c.rfile.readline() == testval
def test_mode_strict_w_cadir_should_pass(self): def test_mode_strict_w_cadir_should_pass(self, tdata):
c = tcp.TCPClient(("127.0.0.1", self.port)) c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect(): with c.connect():
c.convert_to_tls( c.convert_to_tls(
sni="example.mitmproxy.org", sni="example.mitmproxy.org",
verify=SSL.VERIFY_PEER, verify=SSL.VERIFY_PEER,
ca_path=tutils.test_data.path("mitmproxy/net/data/verificationcerts/") ca_path=tdata.path("mitmproxy/net/data/verificationcerts/")
) )
assert c.ssl_verification_error is None assert c.ssl_verification_error is None
@ -369,18 +372,18 @@ class TestSSLClientCert(tservers.ServerTestBase):
v3_only=False v3_only=False
) )
def test_clientcert(self): def test_clientcert(self, tdata):
c = tcp.TCPClient(("127.0.0.1", self.port)) c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect(): with c.connect():
c.convert_to_tls( c.convert_to_tls(
cert=tutils.test_data.path("mitmproxy/net/data/clientcert/client.pem")) cert=tdata.path("mitmproxy/net/data/clientcert/client.pem"))
assert c.rfile.readline().strip() == b"1" assert c.rfile.readline().strip() == b"1"
def test_clientcert_err(self): def test_clientcert_err(self, tdata):
c = tcp.TCPClient(("127.0.0.1", self.port)) c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect(): with c.connect():
with pytest.raises(exceptions.TlsException): with pytest.raises(exceptions.TlsException):
c.convert_to_tls(cert=tutils.test_data.path("mitmproxy/net/data/clientcert/make")) c.convert_to_tls(cert=tdata.path("mitmproxy/net/data/clientcert/make"))
class TestSNI(tservers.ServerTestBase): class TestSNI(tservers.ServerTestBase):
@ -597,7 +600,7 @@ class TestDHParams(tservers.ServerTestBase):
handler = HangHandler handler = HangHandler
ssl = dict( ssl = dict(
dhparams=certs.CertStore.load_dhparam( dhparams=certs.CertStore.load_dhparam(
tutils.test_data.path("mitmproxy/net/data/dhparam.pem"), cdata.path("data/dhparam.pem"),
), ),
cipher_list="DHE-RSA-AES256-SHA" cipher_list="DHE-RSA-AES256-SHA"
) )

View File

@ -4,7 +4,9 @@ import io
import OpenSSL import OpenSSL
from mitmproxy.net import tcp from mitmproxy.net import tcp
from mitmproxy.test import tutils from mitmproxy.utils import data
cdata = data.Data(__name__)
class _ServerThread(threading.Thread): class _ServerThread(threading.Thread):
@ -47,10 +49,10 @@ class _TServer(tcp.TCPServer):
if self.ssl is not None: if self.ssl is not None:
cert = self.ssl.get( cert = self.ssl.get(
"cert", "cert",
tutils.test_data.path("mitmproxy/net/data/server.crt")) cdata.path("data/server.crt"))
raw_key = self.ssl.get( raw_key = self.ssl.get(
"key", "key",
tutils.test_data.path("mitmproxy/net/data/server.key")) cdata.path("data/server.key"))
with open(raw_key) as f: with open(raw_key) as f:
raw_key = f.read() raw_key = f.read()
key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, raw_key) key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, raw_key)

View File

@ -1,16 +1,15 @@
import sys import sys
import pytest import pytest
from mitmproxy.platform import pf from mitmproxy.platform import pf
from mitmproxy.test import tutils
class TestLookup: class TestLookup:
def test_simple(self): def test_simple(self, tdata):
if sys.platform == "freebsd10": if sys.platform == "freebsd10":
p = tutils.test_data.path("mitmproxy/data/pf02") p = tdata.path("mitmproxy/data/pf02")
else: else:
p = tutils.test_data.path("mitmproxy/data/pf01") p = tdata.path("mitmproxy/data/pf01")
with open(p, "rb") as f: with open(p, "rb") as f:
d = f.read() d = f.read()

View File

@ -3,7 +3,6 @@ import pytest
from mitmproxy import options from mitmproxy import options
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy.proxy.config import ProxyConfig from mitmproxy.proxy.config import ProxyConfig
from mitmproxy.test import tutils
class TestProxyConfig: class TestProxyConfig:
@ -13,8 +12,8 @@ class TestProxyConfig:
with pytest.raises(exceptions.OptionsError, match="parent directory does not exist"): with pytest.raises(exceptions.OptionsError, match="parent directory does not exist"):
ProxyConfig(opts) ProxyConfig(opts)
def test_invalid_certificate(self): def test_invalid_certificate(self, tdata):
opts = options.Options() opts = options.Options()
opts.certs = [tutils.test_data.path("mitmproxy/data/dumpfile-011")] opts.certs = [tdata.path("mitmproxy/data/dumpfile-011")]
with pytest.raises(exceptions.OptionsError, match="Invalid certificate format"): with pytest.raises(exceptions.OptionsError, match="Invalid certificate format"):
ProxyConfig(opts) ProxyConfig(opts)

View File

@ -16,13 +16,16 @@ from mitmproxy.net import socks
from mitmproxy.net import tcp from mitmproxy.net import tcp
from mitmproxy.net.http import http1 from mitmproxy.net.http import http1
from mitmproxy.proxy.config import HostMatcher from mitmproxy.proxy.config import HostMatcher
from mitmproxy.test import tutils from mitmproxy.utils import data
from pathod import pathoc from pathod import pathoc
from pathod import pathod from pathod import pathod
from .. import tservers from .. import tservers
from ...conftest import skip_appveyor from ...conftest import skip_appveyor
cdata = data.Data(__name__)
class CommonMixin: class CommonMixin:
def test_large(self): def test_large(self):
@ -257,9 +260,9 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin):
assert resp.status_code == 400 assert resp.status_code == 400
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_stream_modify(self): async def test_stream_modify(self, tdata):
s = script.Script( s = script.Script(
tutils.test_data.path("mitmproxy/data/addonscripts/stream_modify.py"), tdata.path("mitmproxy/data/addonscripts/stream_modify.py"),
False, False,
) )
self.set_addons(s) self.set_addons(s)
@ -288,19 +291,19 @@ class TestHTTPS(tservers.HTTPProxyTest, CommonMixin, TcpMixin):
ssl = True ssl = True
ssloptions = pathod.SSLOptions(request_client_cert=True) ssloptions = pathod.SSLOptions(request_client_cert=True)
def test_clientcert_file(self): def test_clientcert_file(self, tdata):
try: try:
self.options.client_certs = os.path.join( self.options.client_certs = os.path.join(
tutils.test_data.path("mitmproxy/data/clientcert"), "client.pem") tdata.path("mitmproxy/data/clientcert"), "client.pem")
f = self.pathod("304") f = self.pathod("304")
assert f.status_code == 304 assert f.status_code == 304
assert self.server.last_log()["request"]["clientcert"]["keyinfo"] assert self.server.last_log()["request"]["clientcert"]["keyinfo"]
finally: finally:
self.options.client_certs = None self.options.client_certs = None
def test_clientcert_dir(self): def test_clientcert_dir(self, tdata):
try: try:
self.options.client_certs = tutils.test_data.path("mitmproxy/data/clientcert") self.options.client_certs = tdata.path("mitmproxy/data/clientcert")
f = self.pathod("304") f = self.pathod("304")
assert f.status_code == 304 assert f.status_code == 304
assert self.server.last_log()["request"]["clientcert"]["keyinfo"] assert self.server.last_log()["request"]["clientcert"]["keyinfo"]
@ -339,7 +342,7 @@ class TestHTTPSUpstreamServerVerificationWTrustedCert(tservers.HTTPProxyTest):
ssloptions = pathod.SSLOptions( ssloptions = pathod.SSLOptions(
cn=b"example.mitmproxy.org", cn=b"example.mitmproxy.org",
certs=[ certs=[
("example.mitmproxy.org", tutils.test_data.path("mitmproxy/data/servercert/trusted-leaf.pem")) ("example.mitmproxy.org", cdata.path("../data/servercert/trusted-leaf.pem"))
] ]
) )
@ -348,21 +351,21 @@ class TestHTTPSUpstreamServerVerificationWTrustedCert(tservers.HTTPProxyTest):
with p.connect(): with p.connect():
return p.request("get:/p/242") return p.request("get:/p/242")
def test_verification_w_cadir(self): def test_verification_w_cadir(self, tdata):
self.options.update( self.options.update(
ssl_insecure=False, ssl_insecure=False,
ssl_verify_upstream_trusted_cadir=tutils.test_data.path( ssl_verify_upstream_trusted_cadir=tdata.path(
"mitmproxy/data/servercert/" "mitmproxy/data/servercert/"
), ),
ssl_verify_upstream_trusted_ca=None, ssl_verify_upstream_trusted_ca=None,
) )
assert self._request().status_code == 242 assert self._request().status_code == 242
def test_verification_w_pemfile(self): def test_verification_w_pemfile(self, tdata):
self.options.update( self.options.update(
ssl_insecure=False, ssl_insecure=False,
ssl_verify_upstream_trusted_cadir=None, ssl_verify_upstream_trusted_cadir=None,
ssl_verify_upstream_trusted_ca=tutils.test_data.path( ssl_verify_upstream_trusted_ca=tdata.path(
"mitmproxy/data/servercert/trusted-root.pem" "mitmproxy/data/servercert/trusted-root.pem"
), ),
) )
@ -378,7 +381,7 @@ class TestHTTPSUpstreamServerVerificationWBadCert(tservers.HTTPProxyTest):
ssloptions = pathod.SSLOptions( ssloptions = pathod.SSLOptions(
cn=b"example.mitmproxy.org", cn=b"example.mitmproxy.org",
certs=[ certs=[
("example.mitmproxy.org", tutils.test_data.path("mitmproxy/data/servercert/self-signed.pem")) ("example.mitmproxy.org", cdata.path("../data/servercert/self-signed.pem"))
]) ])
def _request(self): def _request(self):
@ -389,8 +392,8 @@ class TestHTTPSUpstreamServerVerificationWBadCert(tservers.HTTPProxyTest):
@classmethod @classmethod
def get_options(cls): def get_options(cls):
opts = super().get_options() opts = super().get_options()
opts.ssl_verify_upstream_trusted_ca = tutils.test_data.path( opts.ssl_verify_upstream_trusted_ca = cdata.path(
"mitmproxy/data/servercert/trusted-root.pem" "../data/servercert/trusted-root.pem"
) )
return opts return opts
@ -417,7 +420,7 @@ class TestHTTPSNoCommonName(tservers.HTTPProxyTest):
ssl = True ssl = True
ssloptions = pathod.SSLOptions( ssloptions = pathod.SSLOptions(
certs=[ certs=[
("*", tutils.test_data.path("mitmproxy/data/no_common_name.pem")) ("*", cdata.path("../data/no_common_name.pem"))
] ]
) )
@ -566,9 +569,9 @@ class TestHttps2Http(tservers.ReverseProxyTest):
class TestTransparent(tservers.TransparentProxyTest, CommonMixin, TcpMixin): class TestTransparent(tservers.TransparentProxyTest, CommonMixin, TcpMixin):
ssl = False ssl = False
def test_tcp_stream_modify(self): def test_tcp_stream_modify(self, tdata):
s = script.Script( s = script.Script(
tutils.test_data.path("mitmproxy/data/addonscripts/tcp_stream_modify.py"), tdata.path("mitmproxy/data/addonscripts/tcp_stream_modify.py"),
False, False,
) )
self.set_addons(s) self.set_addons(s)
@ -1069,7 +1072,7 @@ class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxyTest):
class AddUpstreamCertsToClientChainMixin: class AddUpstreamCertsToClientChainMixin:
ssl = True ssl = True
servercert = tutils.test_data.path("mitmproxy/data/servercert/trusted-root.pem") servercert = cdata.path("../data/servercert/trusted-root.pem")
ssloptions = pathod.SSLOptions( ssloptions = pathod.SSLOptions(
cn=b"example.mitmproxy.org", cn=b"example.mitmproxy.org",
certs=[ certs=[

View File

@ -1,7 +1,6 @@
import pytest import pytest
from mitmproxy.test import tflow from mitmproxy.test import tflow
from mitmproxy.test import tutils
from mitmproxy.test import taddons from mitmproxy.test import taddons
from mitmproxy import controller from mitmproxy import controller
@ -17,10 +16,10 @@ class Thing:
class TestConcurrent(tservers.MasterTest): class TestConcurrent(tservers.MasterTest):
def test_concurrent(self): def test_concurrent(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
sc = tctx.script( sc = tctx.script(
tutils.test_data.path( tdata.path(
"mitmproxy/data/addonscripts/concurrent_decorator.py" "mitmproxy/data/addonscripts/concurrent_decorator.py"
) )
) )
@ -34,19 +33,19 @@ class TestConcurrent(tservers.MasterTest):
raise ValueError("Script never acked") raise ValueError("Script never acked")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_concurrent_err(self): async def test_concurrent_err(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
tctx.script( tctx.script(
tutils.test_data.path( tdata.path(
"mitmproxy/data/addonscripts/concurrent_decorator_err.py" "mitmproxy/data/addonscripts/concurrent_decorator_err.py"
) )
) )
assert await tctx.master.await_log("decorator not supported") assert await tctx.master.await_log("decorator not supported")
def test_concurrent_class(self): def test_concurrent_class(self, tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
sc = tctx.script( sc = tctx.script(
tutils.test_data.path( tdata.path(
"mitmproxy/data/addonscripts/concurrent_decorator_class.py" "mitmproxy/data/addonscripts/concurrent_decorator_class.py"
) )
) )

View File

@ -1,6 +1,5 @@
import os import os
from mitmproxy import certs from mitmproxy import certs
from mitmproxy.test import tutils
# class TestDNTree: # class TestDNTree:
# def test_simple(self): # def test_simple(self):
@ -138,14 +137,14 @@ class TestDummyCert:
class TestCert: class TestCert:
def test_simple(self): def test_simple(self, tdata):
with open(tutils.test_data.path("mitmproxy/net/data/text_cert"), "rb") as f: with open(tdata.path("mitmproxy/net/data/text_cert"), "rb") as f:
d = f.read() d = f.read()
c1 = certs.Cert.from_pem(d) c1 = certs.Cert.from_pem(d)
assert c1.cn == b"google.com" assert c1.cn == b"google.com"
assert len(c1.altnames) == 436 assert len(c1.altnames) == 436
with open(tutils.test_data.path("mitmproxy/net/data/text_cert_2"), "rb") as f: with open(tdata.path("mitmproxy/net/data/text_cert_2"), "rb") as f:
d = f.read() d = f.read()
c2 = certs.Cert.from_pem(d) c2 = certs.Cert.from_pem(d)
assert c2.cn == b"www.inode.co.nz" assert c2.cn == b"www.inode.co.nz"
@ -162,21 +161,21 @@ class TestCert:
assert c1 != c2 assert c1 != c2
def test_err_broken_sans(self): def test_err_broken_sans(self, tdata):
with open(tutils.test_data.path("mitmproxy/net/data/text_cert_weird1"), "rb") as f: with open(tdata.path("mitmproxy/net/data/text_cert_weird1"), "rb") as f:
d = f.read() d = f.read()
c = certs.Cert.from_pem(d) c = certs.Cert.from_pem(d)
# This breaks unless we ignore a decoding error. # This breaks unless we ignore a decoding error.
assert c.altnames is not None assert c.altnames is not None
def test_der(self): def test_der(self, tdata):
with open(tutils.test_data.path("mitmproxy/net/data/dercert"), "rb") as f: with open(tdata.path("mitmproxy/net/data/dercert"), "rb") as f:
d = f.read() d = f.read()
s = certs.Cert.from_der(d) s = certs.Cert.from_der(d)
assert s.cn assert s.cn
def test_state(self): def test_state(self, tdata):
with open(tutils.test_data.path("mitmproxy/net/data/text_cert"), "rb") as f: with open(tdata.path("mitmproxy/net/data/text_cert"), "rb") as f:
d = f.read() d = f.read()
c = certs.Cert.from_pem(d) c = certs.Cert.from_pem(d)

View File

@ -10,7 +10,6 @@ from mitmproxy import exceptions
from mitmproxy.net import tcp from mitmproxy.net import tcp
from mitmproxy.net.http import http1 from mitmproxy.net.http import http1
from mitmproxy.test import tflow from mitmproxy.test import tflow
from mitmproxy.test import tutils
from .net import tservers from .net import tservers
from pathod import test from pathod import test
@ -185,7 +184,7 @@ class TestClientConnectionTLS:
None, None,
"example.com" "example.com"
]) ])
def test_tls_with_sni(self, sni): def test_tls_with_sni(self, sni, tdata):
address = ('127.0.0.1', 0) address = ('127.0.0.1', 0)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@ -206,8 +205,8 @@ class TestClientConnectionTLS:
connection, client_address = sock.accept() connection, client_address = sock.accept()
c = connections.ClientConnection(connection, client_address, None) c = connections.ClientConnection(connection, client_address, None)
cert = tutils.test_data.path("mitmproxy/net/data/server.crt") cert = tdata.path("mitmproxy/net/data/server.crt")
with open(tutils.test_data.path("mitmproxy/net/data/server.key")) as f: with open(tdata.path("mitmproxy/net/data/server.key")) as f:
raw_key = f.read() raw_key = f.read()
key = OpenSSL.crypto.load_privatekey( key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_PEM,
@ -230,10 +229,12 @@ class TestServerConnectionTLS(tservers.ServerTestBase):
@pytest.mark.parametrize("client_certs", [ @pytest.mark.parametrize("client_certs", [
None, None,
tutils.test_data.path("mitmproxy/data/clientcert"), "mitmproxy/data/clientcert",
tutils.test_data.path("mitmproxy/data/clientcert/client.pem"), "mitmproxy/data/clientcert/client.pem",
]) ])
def test_tls(self, client_certs): def test_tls(self, client_certs, tdata):
if client_certs:
client_certs = tdata.path(client_certs)
c = connections.ServerConnection(("127.0.0.1", self.port)) c = connections.ServerConnection(("127.0.0.1", self.port))
c.connect() c.connect()
c.establish_tls(client_certs=client_certs) c.establish_tls(client_certs=client_certs)

View File

@ -8,7 +8,6 @@ from mitmproxy import options
from mitmproxy.proxy import ProxyConfig from mitmproxy.proxy import ProxyConfig
from mitmproxy.proxy.server import DummyServer, ProxyServer, ConnectionHandler from mitmproxy.proxy.server import DummyServer, ProxyServer, ConnectionHandler
from mitmproxy.proxy import config from mitmproxy.proxy import config
from mitmproxy.test import tutils
from ..conftest import skip_windows from ..conftest import skip_windows
@ -42,10 +41,10 @@ class TestProcessProxyOptions:
def test_simple(self): def test_simple(self):
assert self.p() assert self.p()
def test_certs(self): def test_certs(self, tdata):
self.assert_noerr( self.assert_noerr(
"--cert", "--cert",
tutils.test_data.path("mitmproxy/data/testkey.pem")) tdata.path("mitmproxy/data/testkey.pem"))
with pytest.raises(Exception, match="does not exist"): with pytest.raises(Exception, match="does not exist"):
self.p("--cert", "nonexistent") self.p("--cert", "nonexistent")

View File

@ -3,7 +3,6 @@ import io
import pytest import pytest
from mitmproxy.test import taddons from mitmproxy.test import taddons
from mitmproxy.test import tutils
from mitmproxy import ctx from mitmproxy import ctx
@ -27,10 +26,10 @@ async def test_dumplog():
assert s.getvalue() assert s.getvalue()
def test_load_script(): def test_load_script(tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
s = tctx.script( s = tctx.script(
tutils.test_data.path( tdata.path(
"mitmproxy/data/addonscripts/recorder/recorder.py" "mitmproxy/data/addonscripts/recorder/recorder.py"
) )
) )

View File

@ -3,7 +3,6 @@ import os
import typing import typing
import contextlib import contextlib
from mitmproxy.test import tutils
import mitmproxy.exceptions import mitmproxy.exceptions
import mitmproxy.types import mitmproxy.types
from mitmproxy.test import taddons from mitmproxy.test import taddons
@ -64,7 +63,7 @@ def test_int():
b.parse(tctx.master.commands, int, "foo") b.parse(tctx.master.commands, int, "foo")
def test_path(): def test_path(tdata):
with taddons.context() as tctx: with taddons.context() as tctx:
b = mitmproxy.types._PathType() b = mitmproxy.types._PathType()
assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo" assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo"
@ -80,7 +79,7 @@ def test_path():
ret.append(s) ret.append(s)
return ret return ret
cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion")) cd = os.path.normpath(tdata.path("mitmproxy/completion"))
assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/'] assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/']
assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac'] assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac']
with chdir(cd): with chdir(cd):

View File

@ -1,23 +1,23 @@
import asyncio import asyncio
from mitmproxy.tools import main from mitmproxy.tools import main
from mitmproxy.test import tutils
shutdown_script = tutils.test_data.path("mitmproxy/data/addonscripts/shutdown.py")
def test_mitmweb(event_loop): shutdown_script = "mitmproxy/data/addonscripts/shutdown.py"
def test_mitmweb(event_loop, tdata):
asyncio.set_event_loop(event_loop) asyncio.set_event_loop(event_loop)
main.mitmweb([ main.mitmweb([
"--no-web-open-browser", "--no-web-open-browser",
"-s", shutdown_script, "-s", tdata.path(shutdown_script),
"-q", "-p", "0", "-q", "-p", "0",
]) ])
def test_mitmdump(event_loop): def test_mitmdump(event_loop, tdata):
asyncio.set_event_loop(event_loop) asyncio.set_event_loop(event_loop)
main.mitmdump([ main.mitmdump([
"-s", shutdown_script, "-s", tdata.path(shutdown_script),
"-q", "-p", "0", "-q", "-p", "0",
]) ])

View File

@ -61,10 +61,10 @@ class TestDaemonSSL(PathocTestDaemon):
def test_showssl(self): def test_showssl(self):
assert "certificate chain" in self.tval(["get:/p/200"], showssl=True) assert "certificate chain" in self.tval(["get:/p/200"], showssl=True)
def test_clientcert(self): def test_clientcert(self, tdata):
self.tval( self.tval(
["get:/p/200"], ["get:/p/200"],
clientcert=tutils.test_data.path("pathod/data/clientcert/client.pem"), clientcert=tdata.path("pathod/data/clientcert/client.pem"),
) )
log = self.d.log() log = self.d.log()
assert log[0]["request"]["clientcert"]["keyinfo"] assert log[0]["request"]["clientcert"]["keyinfo"]

View File

@ -4,11 +4,9 @@ from unittest import mock
from pathod import pathoc_cmdline as cmdline from pathod import pathoc_cmdline as cmdline
from mitmproxy.test import tutils
@mock.patch("argparse.ArgumentParser.error") @mock.patch("argparse.ArgumentParser.error")
def test_pathoc(perror): def test_pathoc(perror, tdata):
assert cmdline.args_pathoc(["pathoc", "foo.com", "get:/"]) assert cmdline.args_pathoc(["pathoc", "foo.com", "get:/"])
s = io.StringIO() s = io.StringIO()
with pytest.raises(SystemExit): with pytest.raises(SystemExit):
@ -53,7 +51,7 @@ def test_pathoc(perror):
[ [
"pathoc", "pathoc",
"foo.com:8888", "foo.com:8888",
tutils.test_data.path("pathod/data/request") tdata.path("pathod/data/request")
] ]
) )
assert len(list(a.requests)) == 1 assert len(list(a.requests)) == 1

View File

@ -5,11 +5,14 @@ import pytest
from pathod import pathod from pathod import pathod
from mitmproxy.net import tcp from mitmproxy.net import tcp
from mitmproxy import exceptions from mitmproxy import exceptions
from mitmproxy.test import tutils from mitmproxy.utils import data
from . import tservers from . import tservers
cdata = data.Data(__name__)
class TestPathod: class TestPathod:
def test_logging(self): def test_logging(self):
@ -57,7 +60,7 @@ class TestNotAfterConnect(tservers.DaemonTests):
class TestCustomCert(tservers.DaemonTests): class TestCustomCert(tservers.DaemonTests):
ssl = True ssl = True
ssloptions = dict( ssloptions = dict(
certs=[("*", tutils.test_data.path("pathod/data/testkey.pem"))], certs=[("*", cdata.path("data/testkey.pem"))],
) )
def test_connect(self): def test_connect(self):

View File

@ -2,8 +2,6 @@ from unittest import mock
from pathod import pathod_cmdline as cmdline from pathod import pathod_cmdline as cmdline
from mitmproxy.test import tutils
def test_parse_anchor_spec(): def test_parse_anchor_spec():
assert cmdline.parse_anchor_spec("foo=200") == ("foo", "200") assert cmdline.parse_anchor_spec("foo=200") == ("foo", "200")
@ -11,14 +9,14 @@ def test_parse_anchor_spec():
@mock.patch("argparse.ArgumentParser.error") @mock.patch("argparse.ArgumentParser.error")
def test_pathod(perror): def test_pathod(perror, tdata):
assert cmdline.args_pathod(["pathod"]) assert cmdline.args_pathod(["pathod"])
a = cmdline.args_pathod( a = cmdline.args_pathod(
[ [
"pathod", "pathod",
"--cert", "--cert",
tutils.test_data.path("pathod/data/testkey.pem") tdata.path("pathod/data/testkey.pem")
] ]
) )
assert a.ssl_certs assert a.ssl_certs
@ -46,7 +44,7 @@ def test_pathod(perror):
[ [
"pathod", "pathod",
"-a", "-a",
"foo=" + tutils.test_data.path("pathod/data/response") "foo=" + tdata.path("pathod/data/response")
] ]
) )
assert a.anchors assert a.anchors

View File

@ -8,7 +8,7 @@ import urllib
from mitmproxy.net import tcp from mitmproxy.net import tcp
from mitmproxy.test import tutils from mitmproxy.utils import data
from pathod import language from pathod import language
from pathod import pathoc from pathod import pathoc
@ -17,6 +17,9 @@ from pathod import test
from pathod.pathod import CA_CERT_NAME from pathod.pathod import CA_CERT_NAME
cdata = data.Data(__name__)
def treader(bytes): def treader(bytes):
""" """
Construct a tcp.Read object from bytes. Construct a tcp.Read object from bytes.
@ -41,7 +44,7 @@ class DaemonTests:
opts["confdir"] = cls.confdir opts["confdir"] = cls.confdir
so = pathod.SSLOptions(**opts) so = pathod.SSLOptions(**opts)
cls.d = test.Daemon( cls.d = test.Daemon(
staticdir=tutils.test_data.path("pathod/data"), staticdir=cdata.path("data"),
anchors=[ anchors=[
(re.compile("/anchor/.*"), "202:da") (re.compile("/anchor/.*"), "202:da")
], ],