2017-02-01 15:48:46 +00:00
|
|
|
import pytest
|
2016-08-03 10:58:41 +00:00
|
|
|
|
2016-07-14 04:20:27 +00:00
|
|
|
from mitmproxy import options
|
|
|
|
from mitmproxy import contentviews
|
2016-10-18 23:37:32 +00:00
|
|
|
from mitmproxy import proxy
|
2016-10-19 00:22:50 +00:00
|
|
|
from mitmproxy import master
|
2017-02-01 15:48:46 +00:00
|
|
|
from mitmproxy.addons import script
|
2016-08-03 10:58:41 +00:00
|
|
|
|
2017-02-01 15:48:46 +00:00
|
|
|
from mitmproxy.test import tflow
|
2016-11-01 20:44:18 +00:00
|
|
|
from mitmproxy.test import tutils
|
2016-10-19 22:56:38 +00:00
|
|
|
from mitmproxy.net.http import Headers
|
2016-08-03 10:58:41 +00:00
|
|
|
|
2017-03-22 11:02:18 +00:00
|
|
|
from ..mitmproxy import tservers
|
2016-03-07 03:42:10 +00:00
|
|
|
|
2016-11-01 20:44:18 +00:00
|
|
|
example_dir = tutils.test_data.push("../examples")
|
2016-03-07 03:49:29 +00:00
|
|
|
|
2016-05-19 01:46:42 +00:00
|
|
|
|
2016-07-14 04:20:27 +00:00
|
|
|
class ScriptError(Exception):
|
|
|
|
pass
|
2016-03-07 03:49:29 +00:00
|
|
|
|
2016-03-07 03:58:09 +00:00
|
|
|
|
2016-10-19 00:22:50 +00:00
|
|
|
class RaiseMaster(master.Master):
|
2016-07-15 22:00:34 +00:00
|
|
|
def add_log(self, e, level):
|
2016-07-14 04:20:27 +00:00
|
|
|
if level in ("warn", "error"):
|
|
|
|
raise ScriptError(e)
|
2016-03-07 03:58:09 +00:00
|
|
|
|
2016-03-07 06:43:15 +00:00
|
|
|
|
2016-07-14 04:20:27 +00:00
|
|
|
def tscript(cmd, args=""):
|
2016-07-23 03:43:55 +00:00
|
|
|
o = options.Options()
|
2016-07-14 04:20:27 +00:00
|
|
|
cmd = example_dir.path(cmd) + " " + args
|
2016-10-18 23:37:32 +00:00
|
|
|
m = RaiseMaster(o, proxy.DummyServer())
|
2016-07-14 04:20:27 +00:00
|
|
|
sc = script.Script(cmd)
|
2016-09-25 01:21:12 +00:00
|
|
|
m.addons.add(sc)
|
2016-07-14 04:20:27 +00:00
|
|
|
return m, sc
|
2016-03-07 06:43:15 +00:00
|
|
|
|
2016-03-09 18:21:29 +00:00
|
|
|
|
2017-02-10 21:12:24 +00:00
|
|
|
class TestScripts(tservers.MasterTest):
|
2016-07-14 04:20:27 +00:00
|
|
|
def test_add_header(self):
|
2016-11-21 01:16:20 +00:00
|
|
|
m, _ = tscript("simple/add_header.py")
|
2016-11-01 20:44:18 +00:00
|
|
|
f = tflow.tflow(resp=tutils.tresp())
|
2017-03-16 02:40:43 +00:00
|
|
|
m.addons.handle_lifecycle("response", f)
|
2016-07-14 04:20:27 +00:00
|
|
|
assert f.response.headers["newheader"] == "foo"
|
2016-03-09 18:21:29 +00:00
|
|
|
|
2016-07-14 04:20:27 +00:00
|
|
|
def test_custom_contentviews(self):
|
2016-11-21 01:16:20 +00:00
|
|
|
m, sc = tscript("simple/custom_contentview.py")
|
|
|
|
swapcase = contentviews.get("swapcase")
|
|
|
|
_, fmt = swapcase(b"<html>Test!</html>")
|
|
|
|
assert any(b'tEST!' in val[0][1] for val in fmt)
|
2016-07-07 08:21:15 +00:00
|
|
|
|
2016-07-14 04:20:27 +00:00
|
|
|
def test_iframe_injector(self):
|
2017-02-01 15:48:46 +00:00
|
|
|
with pytest.raises(ScriptError):
|
2016-11-21 01:16:20 +00:00
|
|
|
tscript("simple/modify_body_inject_iframe.py")
|
2016-03-09 18:21:29 +00:00
|
|
|
|
2016-11-21 01:16:20 +00:00
|
|
|
m, sc = tscript("simple/modify_body_inject_iframe.py", "http://example.org/evil_iframe")
|
2016-12-10 11:06:33 +00:00
|
|
|
f = tflow.tflow(resp=tutils.tresp(content=b"<html><body>mitmproxy</body></html>"))
|
2017-03-16 02:40:43 +00:00
|
|
|
m.addons.handle_lifecycle("response", f)
|
2016-08-10 05:29:07 +00:00
|
|
|
content = f.response.content
|
2016-07-14 05:19:33 +00:00
|
|
|
assert b'iframe' in content and b'evil_iframe' in content
|
2016-07-14 04:20:27 +00:00
|
|
|
|
|
|
|
def test_modify_form(self):
|
2016-11-21 01:16:20 +00:00
|
|
|
m, sc = tscript("simple/modify_form.py")
|
2016-07-14 04:20:27 +00:00
|
|
|
|
|
|
|
form_header = Headers(content_type="application/x-www-form-urlencoded")
|
2016-11-01 20:44:18 +00:00
|
|
|
f = tflow.tflow(req=tutils.treq(headers=form_header))
|
2017-03-16 02:40:43 +00:00
|
|
|
m.addons.handle_lifecycle("request", f)
|
2016-07-14 04:20:27 +00:00
|
|
|
|
2016-12-19 00:15:10 +00:00
|
|
|
assert f.request.urlencoded_form["mitmproxy"] == "rocks"
|
2016-07-14 04:20:27 +00:00
|
|
|
|
|
|
|
f.request.headers["content-type"] = ""
|
2017-03-16 02:40:43 +00:00
|
|
|
m.addons.handle_lifecycle("request", f)
|
2016-12-19 00:15:10 +00:00
|
|
|
assert list(f.request.urlencoded_form.items()) == [("foo", "bar")]
|
2016-07-14 04:20:27 +00:00
|
|
|
|
|
|
|
def test_modify_querystring(self):
|
2016-11-21 01:16:20 +00:00
|
|
|
m, sc = tscript("simple/modify_querystring.py")
|
2016-11-01 20:44:18 +00:00
|
|
|
f = tflow.tflow(req=tutils.treq(path="/search?q=term"))
|
2016-07-14 04:20:27 +00:00
|
|
|
|
2017-03-16 02:40:43 +00:00
|
|
|
m.addons.handle_lifecycle("request", f)
|
2016-07-14 04:20:27 +00:00
|
|
|
assert f.request.query["mitmproxy"] == "rocks"
|
|
|
|
|
|
|
|
f.request.path = "/"
|
2017-03-16 02:40:43 +00:00
|
|
|
m.addons.handle_lifecycle("request", f)
|
2016-07-14 04:20:27 +00:00
|
|
|
assert f.request.query["mitmproxy"] == "rocks"
|
|
|
|
|
2016-10-15 22:12:58 +00:00
|
|
|
def test_arguments(self):
|
2016-11-21 01:16:20 +00:00
|
|
|
m, sc = tscript("simple/script_arguments.py", "mitmproxy rocks")
|
2016-11-01 20:44:18 +00:00
|
|
|
f = tflow.tflow(resp=tutils.tresp(content=b"I <3 mitmproxy"))
|
2017-03-16 02:40:43 +00:00
|
|
|
m.addons.handle_lifecycle("response", f)
|
2016-07-14 05:19:33 +00:00
|
|
|
assert f.response.content == b"I <3 rocks"
|
2016-07-14 04:20:27 +00:00
|
|
|
|
|
|
|
def test_redirect_requests(self):
|
2016-11-21 01:16:20 +00:00
|
|
|
m, sc = tscript("simple/redirect_requests.py")
|
2016-11-01 20:44:18 +00:00
|
|
|
f = tflow.tflow(req=tutils.treq(host="example.org"))
|
2017-03-16 02:40:43 +00:00
|
|
|
m.addons.handle_lifecycle("request", f)
|
2016-07-14 04:20:27 +00:00
|
|
|
assert f.request.host == "mitmproxy.org"
|
|
|
|
|
2016-11-21 01:16:20 +00:00
|
|
|
def test_send_reply_from_proxy(self):
|
|
|
|
m, sc = tscript("simple/send_reply_from_proxy.py")
|
|
|
|
f = tflow.tflow(req=tutils.treq(host="example.com", port=80))
|
2017-03-16 02:40:43 +00:00
|
|
|
m.addons.handle_lifecycle("request", f)
|
2016-11-21 01:16:20 +00:00
|
|
|
assert f.response.content == b"Hello World"
|
|
|
|
|
2017-01-30 22:44:13 +00:00
|
|
|
def test_dns_spoofing(self):
|
|
|
|
m, sc = tscript("complex/dns_spoofing.py")
|
|
|
|
original_host = "example.com"
|
|
|
|
|
|
|
|
host_header = Headers(host=original_host)
|
2017-01-31 22:23:13 +00:00
|
|
|
f = tflow.tflow(req=tutils.treq(headers=host_header, port=80))
|
2017-01-30 22:44:13 +00:00
|
|
|
|
2017-03-16 02:40:43 +00:00
|
|
|
m.addons.handle_lifecycle("requestheaders", f)
|
2017-01-30 22:44:13 +00:00
|
|
|
|
|
|
|
# Rewrite by reverse proxy mode
|
2017-01-31 22:23:13 +00:00
|
|
|
f.request.scheme = "https"
|
|
|
|
f.request.port = 443
|
2017-01-30 22:44:13 +00:00
|
|
|
|
2017-03-16 02:40:43 +00:00
|
|
|
m.addons.handle_lifecycle("request", f)
|
2017-01-31 22:23:13 +00:00
|
|
|
|
|
|
|
assert f.request.scheme == "http"
|
|
|
|
assert f.request.port == 80
|
|
|
|
|
2017-01-30 22:44:13 +00:00
|
|
|
assert f.request.headers["Host"] == original_host
|